Глава 8. Проектирование параллельных программ

В этой главе:

■ Методы распределения данных между потоками.

■ Факторы, влияющие на производительность параллельного кода.

■ Как от этих факторов зависит дизайн параллельных структур данных.

■ Безопасность многопоточного кода относительно исключений.

■ Масштабируемость.

■ Примеры реализации параллельных алгоритмов.

В предыдущих главах мы в основном занимались появившимися в новом стандарте C++11 средствами для написания параллельных программ. В главах 6 и 7 мы видели, как эти средства применяются для проектирования простых структур данных, безопасных относительно доступа из нескольких потоков. Но как столяру недостаточно знать, как устроена петля или шарнир, чтобы смастерить шкаф или стол, так и для проектирования параллельных программ недостаточно знакомства с устройством и применением простых структур данных. Теперь мы будем рассматривать проблему в более широком контексте и научимся строить более крупные узлы, способные выполнять полезную работу В качестве примеров я возьму многопоточные реализации некоторых алгоритмов из стандартной библиотеки С++, но те же самые принципы остаются в силе при разработке всех уровней приложения.

Как и в любом программном проекте, тщательное продумывание структуры параллельного кода имеет первостепенное значение. Однако в параллельной программе приходится учитывать еще больше факторов, чем в последовательной. Нужно принимать во внимание не только инкапсуляцию, связанность и сцепленность (эти вопросы подробно рассматриваются во многих книгах по проектированию программного обеспечения), но и то, какие данные разделять, как организовывать доступ к ним, каким потокам придётся ждать других потоков для завершения операции и т.д.

Именно этим мы и будем заниматься в этой главе, начав с высокоуровневых (но фундаментальных) вопросов о том, сколько создавать потоков, какой код выполнять в каждом потоке и как многопоточность влияет на понятность программы, и закончив низкоуровневыми деталями того, как организовать разделяемые данные для достижения оптимальной производительности.

Начнем с методов распределения работы между потоками.

8.1. Методы распределения работы между потоками

Представьте, что вам поручили построить дом. Для этого предстоит вырыть котлован под фундамент, возвести стены, провести водопровод и электропроводку и т.д. Теоретически, имея достаточную подготовку, все это можно сделать и в одиночку, по времени уйдет много и придётся постоянно переключаться с одного занятия на другое. Можно вместо этого нанять несколько помощников. Тогда нужно решить, сколько людей нанимать и каких специальностей. К примеру, пригласить двух универсалов и всё делать совместно. По-прежнему нужно будет переходить от одной работы к другой, но в итоге строительство удастся завершить быстрее, так как теперь в нем принимает участие больше народу

Есть и другой путь — нанять бригаду узких специалистов: каменщика, плотника, электрика, водопроводчика и других. Специалисты делают то, что лучше всего умеют, так что если прокладывать трубы не надо, то водопроводчик попивает чаёк. Работа все равно продвигается быстрее, так как занято больше людей, — водопроводчик может заниматься туалетом, пока электрик делает проводку на кухне, но зато когда для конкретного специалиста работы нет, он простаивает. Впрочем, несмотря на простои, специалисты, скорее всего, справятся быстрее, чем бригада мастеров на все руки. Им не нужно менять инструменты, да и свое дело они по идее должны делать быстрее, чем универсалы. Так ли это на самом деле, зависит от разных обстоятельств — нет другого пути, как взять и попробовать.

Но и специалистов можно нанять много или мало. Наверное, имеет смысл нанимать больше каменщиков, чем электриков. Кроме того, состав бригады и ее общая эффективность могут измениться, если предстоит построить несколько домов. Если в одном доме у водопроводчика мало работы, то на строительстве нескольких его можно занять полностью. Наконец, если вы не обязаны платить специалистам за простой, то можно позволить себе нанять больше людей, пусть даже в каждый момент времени работать будет столько же, сколько раньше.

Но хватит уже о строительстве, какое отношение всё это имеет к потокам? Да просто к ним применимы те же соображения. Нужно решить, сколько использовать потоков и какие задачи поручить каждому Должны ли потоки быть «универсалами», берущимися за любую подвернувшуюся работу или «специалистами», которые умеют хорошо делать одно дело? Или, быть может, нужно сочетание того и другого? Эти решения необходимо принимать вне зависимости от причин распараллеливания программы и от того, насколько они будут удачны, существенно зависит производительность и ясность кода. Поэтому так важно понимать, какие имеются варианты; тогда решения о структуре приложения будут опираться на знания, а не браться с потолка. В этом разделе мы рассмотрим несколько методов распределения задач и начнем с распределения данных между потоками.

8.1.1. Распределение данных между потоками до начала обработки

Легче всего распараллеливаются такие простые алгоритмы, как

std::for_each
, которые производят некоторую операцию над каждым элементом набора данных. Чтобы распараллелить такой алгоритм, можно назначить каждому элементу один из обрабатывающих потоков. Каким образом распределить элементы для достижения оптимальной производительности, зависит от деталей структуры данных, как мы убедимся ниже в этой главе.

Простейший способ распределения данных заключается в том, чтобы назначить первые N элементов одному потоку, следующие N — другому и так далее, как показано на рис. 8.1, хотя это и не единственное решение. Как бы ни были распределены данные, каждый поток обрабатывает только назначенные ему элементы, никак не взаимодействуя с другими потоками до тех пор, пока не завершит обработку.

Рис. 8.1. Распределение последовательных блоков данных между потоками

Такая организация программы знакома каждому, кто программировал в системах Message Passing Interface (МРI)[17] или OpenMP[18]: задача разбивается на множество параллельных подзадач, рабочие потоки выполняют их параллельно, а затем результаты объединяются на стадии редукции. Этот подход применён в примере функции

accumulate
из раздела 2.4; в данном случае и параллельные задачи, и финальный шаг редукции представляют собой аккумулирование. Для простого алгоритма
for_each
финальный шаг отсутствует, так как нечего редуцировать.

Понимание того, что последний шаг является редукцией, важно; в наивной реализации, представленной в листинге 2.8, финальная редукция выполняется как последовательная операция. Однако часто и ее можно распараллелить; операция

accumulate
по сути дела сама является редукцией, поэтому функцию из листинга 2.8 можно модифицировать таким образом, чтобы она вызывала себя рекурсивно, если, например, число потоков больше минимального количества элементов, обрабатываемых одним потоком. Или запрограммировать рабочие потоки так, чтобы по завершении задачи они выполняли некоторые шаги редукции, а не запускать каждый раз новые потоки.

Хотя это действенная техника, применима она не в любой ситуации. Иногда данные не удается заранее распределить равномерно, а как это сделать, становится понятно только в процессе обработки. Особенно наглядно это проявляется в таких рекурсивных алгоритмах, как Quicksort; здесь нужен другой подход.

8.1.2. Рекурсивное распределение данных

Алгоритм Quicksort состоит из двух шагов: разбиение данных на две части — до и после опорного элемента в смысле требуемого упорядочения, и рекурсивная сортировка обеих «половин». Невозможно распараллелить этот алгоритм, разбив данные заранее, потому что состав каждой «половины» становится известен только в процессе обработки элементов. Поэтому распараллеливание по необходимости должно быть рекурсивным. На каждом уровне рекурсии производится больше вызовов функции

quick_sort
, потому что предстоит отсортировать элементы, меньшие опорного, и большие опорного. Эти рекурсивные вызовы не зависят друг от друга, так как обращаются к разным элементам, поэтому являются идеальными кандидатами для параллельного выполнения. На рис. 8.2 изображено такое рекурсивное разбиение.

Рис. 8.2. Рекурсивное разбиение данных

В главе 4 была приведена подобная реализация. Вместо того чтобы просто вызывать функцию рекурсивно для большей и меньшей «половины», мы с помощью

std::async()
на каждом шаге запускали асинхронную задачу для меньшей половины. Вызывая
std::async()
, мы просим стандартную библиотеку С++ самостоятельно решить, имеет ли смысл действительно выполнять задачу в новом потоке или лучше сделать это синхронно.

Это существенно: при сортировке большого набора данных запуск нового потока для каждого рекурсивного вызова быстро приводит к образованию чрезмерного количества потоков. Как мы увидим ниже, когда потоков слишком много, производительность может не возрасти, а наоборот упасть. Кроме того, если набор данных очень велик, то потоков может просто не хватить. Сама идея рекурсивного разбиения на задачи хороша, нужно только строже контролировать число запущенных потоков. В простых случаях с этим справляется

std::async()
, но есть и другие варианты.

Один из них — воспользоваться функцией

std::thread::hardware_concurrency()
для выбора нужного числа потоков, как мы делали в параллельной версии
accumulate()
из листинга 2.8. Тогда вместо того чтобы запускать новый поток для каждого рекурсивного вызова, мы можем просто поместить подлежащий сортировке блок данных в потокобезопасный стек типа того, что был описан в главах 6 и 7. Если потоку больше нечего делать — потому что он закончил обработку всех своих блоков или потому что ждет, когда необходимый ему блок будет отсортирован, то он может взять блок из стека и заняться его сортировкой.

В следующем листинге показана реализация этой идеи.


Листинг 8.1. Параллельный алгоритм Quicksort с применением стека блоков, ожидающих сортировки

template

struct sorter { ←
(1)

 struct chunk_to_sort {

  std::list data;

  std::promise > promise;

 };


 thread_safe_stack chunks; ←
(2)

 std::vector threads;     ←
(3)

 unsigned const max_thread_count;

 std::atomic end_of_data;


 sorter():

  max_thread_count(std::thread::hardware_concurrency() - 1),

  end_of_data(false) {}


 ~sorter() {      ←
(4)

  end_of_data = true; ←
(5)


  for (unsigned i = 0; i < threads.size(); ++i) {

  threads[i].join(); ←
(6)

  }

 }


 void try_sort_chunk() {

  boost::shared_ptr chunk = chunks.pop();←
(7)

  if (chunk) {

  sort_chunk(chunk); ←
(8)

  }

 }


 std::list do_sort(std::list& chunk_data) { ←
(9)

  if (chunk_data.empty()) {

  return chunk_data;

  }


  std::list result;

  result.splice(result.begin(), chunk_data, chunk_data.begin());

  T const& partition_val = *result.begin();


  typename std::list::iterator divide_point = ←
(10)

  std::partition(chunk_data.begin(), chunk_data.end(),

   [&](T const& val) {return val < partition_val; });

  chunk_to_sort new_lower_chunk;

  new_lower_chunk.data.splice(new_lower_chunk.data.end(),

  chunk_data, chunk_data.begin(),

  divide_point);

  std::future > new_lower =

  new_lower_chunk.promise.get_future();

 chunks.push(std::move(new_lower_chunk)); ←
(11)

  if (threads.size() < max_thread_count) { ←
(12)

  threads.push_back(std::thread(&sorter::sort_thread, this));

 }


  std::list new_higher(do_sort(chunk_data));


  result.splice(result.end(), new_higher);

  while (new_lower.wait_for(std::chrono::seconds(0)) !=

  std::future_status::ready) { ←
(13)

  try_sort_chunk();       ←
(14)

  }


  result.splice(result.begin(), new_lower.get());

  return result;

 }


 void sort_chunk(boost::shared_ptr const& chunk) {

  chunk->promise.set_value(do_sort(chunk->data));←
(15)

 }


 void sort_thread() {

  while (!end_of_data) {   ←
(16)

  try_sort_chunk();     ←
(17)

  std::this_thread::yield();←
(18)

  }

 }

};


template

std::list parallel_quick_sort(std::list input) { ←
(19)

 if (input.empty()) {

  return input;

 }

 sorter s;


 return s.do_sort(input); ←
(20)

}

Здесь функция

parallel_quick_sort
(19) делегирует большую часть работы классу
sorter
(1), который объединяет стек неотсортированных блоков (2) с множеством потоков (3). Основные действия производятся в функции-члене
do_sort
(9), которая занимается обычным разбиением данных на две части (10). Но на этот раз она не запускает новый поток для каждого блока, а помещает его в стек (11) и запускает новый поток, только если еще есть незанятые процессоры (12). Поскольку меньший блок, возможно, обрабатывается другим потоком, нам необходимо дождаться его готовности (13). Чтобы не простаивать зря (в том случае, когда данный поток единственный или все остальные уже заняты), мы пытаемся обработать блок, находящийся в стеке (14). Функция
try_sort_chunk
извлекает поток из стека (7), сортирует его (8) и сохраняет результаты в обещании
promise
, так чтобы их смог получить поток, который поместил в стек данный блок (15).

Запущенные потоки крутятся в цикле, пытаясь отсортировать блоки, находящиеся в стеке (17), ожидая, пока будет установлен флаг

end_of_data
(16). В промежутке между проверками они уступают процессор другим потокам (18), чтобы у тех был шанс поместить в стек новые блоки. Эта программа полагается на то, что деструктор класса
sorter
(4) разберется с запущенными потоками. Когда все данные будут отсортированы,
do_sort
вернет управление (хотя рабочие потоки все еще выполняются), так что главный поток вернется из
parallel_quick_sort
(20) и, стало быть, уничтожит объект
sorter
. В этот момент деструктор поднимет флаг
end_of_data
(5) и дождется завершения рабочих потоков (6). Поднятый флаг является для функции потока указанием выйти из цикла (16).

При таком подходе не возникает проблемы неограниченного количества потоков, с которой мы сталкивались, когда

spawn_task
каждый раз запускает новый поток. С другой стороны, мы не полагаемся на то, что стандартная библиотека С++ выберет количество рабочих потоков за нас, как то происходит при использовании
std::async()
. Мы сами ограничиваем число потоков значением
std::thread::hardware_concurrency()
, чтобы избежать чрезмерно большого количества контекстных переключений. Однако же взамен нас поджидает другая потенциальная проблема: управление потоками и взаимодействие между ними сильно усложняют код. Кроме того, хотя потоки и обрабатывают разные элементы данных, все они обращаются к стеку для добавления новых блоков и извлечения блоков для сортировки. Из-за этой острой конкуренции производительность может снизиться, пусть даже мы используем свободный от блокировок (и, значит, неблокирующий) стек. Почему так происходит, мы увидим чуть ниже.

Этот подход представляет собой специализированную версию пула потоков — существует набор потоков, которые получают задачи из списка ожидающих работ, выполняют их, а затем снова обращаются к списку. Некоторые потенциальные проблемы, свойственные пулам потоков (в том числе конкуренция за список работ), и способы их решения рассматриваются в главе 9. Задача масштабирования приложения на несколько процессоров более детально обсуждается в этой главе ниже (см. раздел 8.2.1).

Как при предварительном, так и при рекурсивном распределении данных мы предполагаем, что сами данные фиксированы заранее, а нам нужно лишь найти удачный способ их разбиения. Но так бывает не всегда; если данные порождаются динамически или поступают из внешнего источника, то такой подход не годится. В этом случае имеет смысл распределять работу по типам задач, а не по данным.

8.1.3. Распределение работы по типам задач

Распределение работы между потоками путем назначения каждому потоку блока данных (заранее или рекурсивно во время обработки) все же опирается на предположение о том, что все потоки производят одни те же действия с данными. Альтернативный подход — заводить специализированные потоки, каждый из которых решает отдельную задачу — как водопроводчики и электрики на стройке. Потоки могут даже работать с одними и теми же данными, но обрабатывать их по-разному.

Такой способ распределения работы — следствие распараллеливания ради разделения обязанностей; у каждого потока есть своя задача, которую он решает независимо от остальных. Время от времени другие потоки могут поставлять нашему потоку данные или генерировать события, на которые он должен отреагировать, но в общем случае каждый поток занимается тем, для чего предназначен. В принципе, это хороший дизайн — у каждой части кода есть своя зона ответственности.

Распределение работы по типам задач с целью разделения обязанностей

Однопоточное приложение должно как-то разрешать противоречия с принципом единственной обязанности, если существует несколько задач, которые непрерывно выполняются в течение длительного промежутка времени, или если приложение должно обрабатывать поступающие события (например, нажатия на клавиши или входящие сетевые пакеты) своевременно, несмотря на наличие других задач. Для решения этой проблемы мы обычную вручную пишем код, который выполняет кусочек задачи А, потом кусочек задачи В, потом проверяет, не нажата ли клавиша и не пришёл ли сетевой пакет, а потом возвращается в начало цикла, чтобы выполнить следующий кусочек задачи А. В результате код задачи А оказывается усложнен из-за необходимости сохранять состояние и периодически возвращать управление главному циклу. Если выполнять в этом цикле чрезмерно много задач, то скорость работы упадёт, а пользователю придётся слишком долго ждать реакции на нажатие клавиши. Уверен, все вы встречались с крайними проявлениями этого феномена не в одном так в другом приложении: вы просите программу выполнить какое-то действие, после чего интерфейс вообще перестаёт реагировать, пока оно не завершится.

Тут-то и приходят на выручку потоки. Если выполнять каждую задачу в отдельном потоке, то всю эту работу возьмет на себя операционная система. В коде для решения задачи А вы можете сосредоточить внимание только на этой задаче и не думать о сохранении состояния, возврате в главный цикл и о том, сколько вам понадобится времени. Операционная система автоматически сохранит состояние и в подходящий момент переключится на задачу В или С, а если система оснащена несколькими процессорами или ядрами, то задачи А и В могут даже выполняться действительно параллельно. Код обработки клавиш или сетевых пакетов теперь будет работать без задержек, и все остаются в выигрыше: пользователь своевременно получает отклик от программы, а разработчик может писать более простой код, так как каждый поток занимается только тем, что входит в его непосредственные обязанности, не отвлекаясь на управление порядком выполнения или на взаимодействие с пользователем.

Звучит, как голубая мечта. Да возможно ли такое? Как всегда, дьявол кроется в деталях. Если всё действительно независимо и потокам не нужно общаться между собой, то это и вправду легко. Увы, мир редко бывает таким идеальным. Эти замечательные фоновые потоки часто выполняют какие-то запросы пользователя и должны уведомлять пользователя о завершении, обновляя графический интерфейс. А то еще пользователь вздумает отменить задачу, и тогда интерфейс должен каким-то образом послать потоку сообщение с требованием остановиться. В обоих случаях необходимо все тщательно продумать и правильно синхронизировать, хотя обязанности таки разделены. Поток пользовательского интерфейса занимается только интерфейсом, но должен обновлять его но требованию других потоков. Аналогично поток, занятый фоновой задачей, выполняет только операции, свойственные данной задаче, но не исключено, что одна из этих обязанностей заключается в том, чтобы остановиться по просьбе другого потока. Потоку все равно, откуда поступил запрос, важно лишь, что он адресован ему и относится к его компетенции.

На пути разделения обязанностей между потоками нас подстерегают две серьезные опасности. Во-первых, мы можем неправильно разделить обязанности. Признаками такой ошибки является большой объем разделяемых данных или положение, при котором потоки должны ждать друг друга; то и другое означает, что взаимодействие между потоками избыточно интенсивно. Когда такое происходит, нужно понять, чем вызвана необходимость взаимодействия. Если все сводится к какой-то одной причине, то, быть может, соответствующую обязанность следует поручить отдельному потоку, освободив от нее всех остальных. Наоборот, если два потока много взаимодействуют друг с другом и мало — с остальными, то, возможно, имеет смысл объединить их в один.

Распределяя задачи по типам, не обязательно полностью изолировать их друг от друга. Если к нескольким наборам входных данных требуется применять одну последовательность операций, то работу можно разделить так, что каждый поток будет выполнять какой-то один этап этой последовательности.

Распределение последовательности задач между потоками

Если задача заключается в применении одной последовательности операций ко многим независимым элементам данных, то можно организовать распараллеленный конвейер. Здесь можно провести аналогию с физическим конвейером: данные поступают с одного конца, подвергаются ряду операций и выходят с другого конца.

Чтобы распределить работу таким образом, следует создать отдельный поток для каждого участка конвейера, то есть для каждой операции. По завершении операции элемент данных помещается в очередь, откуда его забирает следующий поток. В результате поток, выполняющий первую операцию, сможет приступить к обработке следующего элемента, пока второй поток трудится над первым элементом.

Такая альтернатива стратегии распределения данных между потоками, которая была описана в разделе 8.1.1, хорошо приспособлена для случаев, когда входные данные вообще неизвестны до начала операции. Например, данные могут поступать по сети или первой в последовательности операцией может быть просмотр файловой системы на предмет нахождения подлежащих обработке файлов.

Конвейеры хороши также тогда, когда каждая операция занимает много времени; распределяя между потоками задачи, а не данные, мы изменяем качественные показатели производительности. Предположим, что нужно обработать 20 элементов данных на машине с четырьмя ядрами, и обработка каждого элемента состоит из четырех шагов, по 3 секунды каждый. Если распределить данные между потоками, то каждому потоку нужно будет обработать 5 элементов. В предположении, что нет никаких других операций, которые могли бы повлиять на хронометраж, по истечении 12 секунд будет обработано четыре элемента, по истечении 24 секунд — 8 элементов и т.д. На обработку всех 20 элементов уйдет 1 минута. Если организовать конвейер, ситуация изменится. Четыре шага можно распределить между четырьмя ядрами. Теперь первый элемент будет обрабатываться каждым из четырех ядер, и в целом на это уйдет 12 секунд — как и раньше. На самом деле, по истечении 12 секунд в нас обработан всего один элемент, что хуже, чем при распределении данных. Однако после того как конвейер запущен, работа идет немного по-другому; первое ядро, обработав первый элемент, переходит ко второму. Поэтому, когда последнее ядро закончит обработку первого элемента, второй уже будет наготове. В результате каждые три секунды на выходе получается очередной обработанный элемент, тогда как раньше элементы выходили пачками по четыре каждые 12 секунд.

Суммарное время обработки всего пакета может увеличиться, потому что придётся подождать 9 секунд, пока первый элемент доберется до последнего ядра. Но более плавная обработка в некоторых случаях предпочтительнее. Возьмем, к примеру, систему воспроизведения цифрового видео высокой четкости. Чтобы фильм можно было смотреть без напряжения, частота кадров должна быть не менее 25 в секунду, а лучше — больше. Кроме того, временные промежутки между кадрами должны быть одинаковы для создания иллюзии непрерывного движения. Приложение, способное декодировать 100 кадров секунду, никому не нужно, если оно секунду ждет, потом «выплевывает» сразу 100 кадров, потом еще секунду ждет и снова выдает 100 кадров. С другой стороны, зритель не будет возражать против двухсекундной задержки перед началом просмотра. В такой ситуации распараллеливание с помощью конвейера, позволяющее выводить кадры с постоянной скоростью, безусловно, предпочтительнее.

Познакомившись с различными способами распределения работы между потоками, посмотрим теперь, какие факторы влияют на производительность многопоточной системы и как от них зависит выбор решения.

8.2. Факторы, влияющие на производительность параллельного кода

Если вы распараллеливаете программу, чтобы повысить ее быстродействие в системе с несколькими процессорами, то должны знать о том, какие факторы вообще влияют на производительность. Но и в том случае, когда потоки применяются просто для разделения обязанностей, нужно позаботиться о том, чтобы многопоточность не привела к снижению быстродействия. Пользователи не скажут спасибо, если приложение будет работать на новенькой 16-ядерной машине медленнее, чем на старой одноядерной.

Как мы скоро увидим, на производительность многопоточного кода оказывают влияние многие факторы и даже решение о том, какой элемент данных в каком потоке обрабатывать (при прочих равных условиях) может кардинально изменить скорость работы программы. А теперь без долгих слов приступим к изучению этих факторов и начнем с самого очевидного: сколько процессоров имеется в системе?

8.2.1. Сколько процессоров?

Количество (и конфигурация) процессоров — первый из существенных факторов, влияющих на производительность многопоточного приложения. Иногда вы точно знаете о том, на каком оборудовании будет работать программа и можете учесть это при проектировании, произведя реальные измерения на целевой системе или ее точной копии. Если так, то вам крупно повезло; как правило, разработчик лишен такой роскоши. Быть может, программа пишется на похожей системе, но различия могут оказаться весьма значимыми. Например, вы разрабатывали на двух- или четырехъядерной машине, а у заказчика один многоядерный процессор (с произвольным числом ядер), или несколько одноядерных или даже несколько многоядерных процессоров. Поведение и характеристики производительности программы могут существенно зависеть от таких деталей, поэтому нужно заранее продумывать возможные последствия и тестировать в максимально разнообразных конфигурациях.

В первом приближении один 16-ядерный процессор эквивалентен четырем 4-ядерным или 16 одноядерным, во всех случаях одновременно могут выполняться 16 потоков. Чтобы в полной мере задействовать имеющийся параллелизм, в программе должно быть не менее 16 потоков. Если их меньше, то вычислительная мощность используется не полностью (пока оставляем за скобками тот факт, что могут работать и другие приложения). С другой стороны, если готовых к работе (не заблокированных в ожидании чего-то) потоков больше 16, то приложение будет попусту растрачивать процессорное время на контекстное переключение, о чем мы уже говорили в главе 1. Такая ситуация называется превышением лимита (oversubscription).

Чтобы приложение могло согласовать количество потоков с возможностями оборудования, в стандартной библиотеке Thread Library имеется функция

std::thread::hardware_concurrency()
. Мы уже видели, как ее можно использовать для определения подходящего количества потоков.

Использовать

std::thread::hardware_concurrency()
напрямую следует с осторожностью; ваш код не знает, какие еще потоки работают в программе, если только вы не сделали эту информацию разделяемой. В худшем случае, когда несколько потоков одновременно вызывают функцию, которая принимает решение о масштабировании с помощью
std::thread::hardware_concurrency()
, превышение лимита получится очень большим. Функция
std::async()
решает эту проблему, потому что библиотека знает обо всех обращениях к ней и может планировать потоки с учетом этой информации. Избежать этой трудности помогают также пулы потоков, если пользоваться ими с умом.

Однако даже если вы учитываете все потоки в своем приложении, остаются еще другие запущенные в системе программы. Вообще-то в однопользовательских системах редко запускают одновременно несколько счетных задач, но бывают области применения, где это обычное дело. Если система проектировалась специально под такие условия, то обычно в ней есть механизмы, позволяющие каждому приложению заказать подходящее количество потоков, хотя они и выходят за рамки стандарта С++. Один из вариантов — аналог

std::async()
, который при выборе количества потоков учитывает общее число асинхронных задач, выполняемых всеми приложениями. Другой — ограничение числа процессорных ядер, доступных данному приложению. Лично я ожидал бы, что это ограничение будет отражено в значении, которое возвращает функция
std::thread::hardware_concurrency()
на таких платформах, однако это не гарантируется. Если вас интересует подобный сценарий, обратитесь в документации.

Положение осложняется еще и тем, что идеальный алгоритм для решения конкретной задачи может зависеть от размерности задачи в сравнении с количеством процессорных устройств. Если имеется массивно параллельная система, где процессоров очень много, то алгоритм с большим числом операций может завершиться быстрее алгоритма с меньшим числом операций, потому что каждый процессор выполняет лишь малую толику общего числа операций.

По мере роста числа процессоров возникает и еще одна проблема, влияющая на производительность: обращение к общим данным со стороны нескольких процессоров.

8.2.2. Конкуренция за данные и перебрасывание кэша

Если два потока, одновременно выполняющиеся на разных процессорах, читают одни и те же данные, то обычно проблемы не возникает — данные просто копируются в кэши каждого процессора. Но если один поток модифицирует данные, то изменение должно попасть в кэш другого процессора, а на это требуется время. В зависимости от характера операций в двух потоках и от упорядочения доступа к памяти, модификация может привести к тому, что один процессор должен будет остановиться и подождать, пока аппаратура распространит изменение. С точки зрения процессора, это феноменально медленная операция, эквивалентная многим сотням машинных команд, хотя точное время зависит в основном от физической конструкции оборудования.

Рассмотрим следующий простой фрагмент кода:

std::atomic counter(0);

void processing_loop() {

 while(counter.fetch_add(

  1, std::memory_order_relaxed) < 100000000) {

  do_something();

 }

}

Переменная

counter
глобальная, поэтому любой поток, вызывающий
processing_loop()
, изменяет одну и ту же переменную. Следовательно, после каждого инкремента процессор должен загрузить в свой кэш актуальную копию
counter
, модифицировать ее значение и сделать его доступным другим процессорам. И хотя мы задали упорядочение
std::memory_order_relaxed
, чтобы процессору не нужно было синхронизироваться с другими данными,
fetch_add
— это операция чтения-модификации-записи и, значит, должна получить самое последнее значение переменной. Если другой поток на другом процессоре выполняет этот же код, то значение
counter
придётся передавать из кэша одного процессора в кэш другого, чтобы при каждом инкременте процессор видел актуальное значение
counter
. Если функция
do_something()
достаточно короткая или этот код исполняет много процессоров, то дело кончится тем, что они будут ожидать друг друга; один процессор готов обновить значение, но в это время другой уже обновляет, поэтому придётся дождаться завершения операции и распространения изменения. Такая ситуация называется высокой конкуренцией. Если процессорам редко приходится ждать друг друга, то говорят о низкой конкуренции.

Подобный цикл приводит к тому, что значение counter многократно передается из одного кэша в другой. Это явление называют перебрасыванием кэша (cache ping-pong), оно может серьезно сказаться на производительности приложения. Когда процессор простаивает в ожидании передачи в кэш, он не может делать вообще ничего, даже если имеются другие потоки, которые могли бы заняться полезной работой. Так что ничего хорошего в этом случае приложению не светит.

Быть может, вы думаете, что к вам это не относится — ведь в вашей-то программе таких циклов нет. Да так ли? А как насчет блокировок мьютексов? Когда программа захватывает мьютекс в цикле, она выполняет очень похожий код — с точки зрения доступа к данным. Чтобы захватить мьютекс, поток должен доставить составляющие мьютекс данные своему процессору и модифицировать их. Затем он снова модифицирует мьютекс, чтобы освободить его, а данные мьютекса необходимо передать следующему потоку, желающему его захватить. Время передачи добавляется к времени, в течение которого второй поток должен ждать, пока первый освободит мьютекс:

std::mutex m;

my_data data;

void processing_loop_with_mutex() {

 while (true) {

  std::lock_guard lk(m);

  if (done_processing(data)) break;

 }

}

А теперь самое печальное: если к данным и мьютексу обращаются сразу несколько потоков, то при увеличении числа ядер и процессоров ситуация только ухудшается, то есть возрастает вероятность получить высокую конкуренцию из-за того, что процессоры ждут друг друга. Если вы запускаете несколько потоков для ускорения обработки одних и тех же данных, то потоки начинают конкурировать за данные и, следовательно, за один и тот же мьютекс. Чем потоков больше, чем вероятнее, что они будут пытаться одновременно захватить мьютекс или получить доступ к атомарной переменной или еще что-нибудь в этом роде.

Последствия конкуренции за мьютексы и за атомарные переменные обычно разнятся по той простой причине, что мьютекс сериализует потоки на уровне операционной системы, а не процессора. Если количество готовых к выполнению потоков достаточно, то операционная система может запланировать один поток, пока другой ожидает мьютекса. Напротив, застывший процессор прекращает выполнение работающего на нем потока. Тем не менее, он оказывает влияние на производительность потоков, конкурирующих за мьютекс, — в конце концов, по определению в каждый момент времени может выполняться только один из них.

В главе 3 мы видели, что редко обновляемую структуру данных можно защитить мьютексом типа «несколько читателей — один писатель» (см. раздел 3.3.2). Эффект перебрасывания кэша может свести на нет преимущества такого мьютекса при неподходящей рабочей нагрузке, потому что все потоки, обращающиеся к данным (пусть даже только для чтения) все равно должны модифицировать сам мьютекс. По мере увеличения числа процессоров, обращающихся к данным, конкуренция за мьютекс возрастает, и строку кэша, в которой находится мьютекс, приходится передавать между ядрами, что увеличивает время захвата и освобождения мьютекса до неприемлемого уровня. Существуют приёмы, позволяющие сгладить остроту этой проблемы; суть их сводится к распределению мьютекса между несколькими строками кэша, но если вы не готовы реализовать такой мьютекс самостоятельно, то должны будете мириться с тем, что дает система.

Если перебрасывание кэша — это так плохо, то можно ли его избежать? Ниже в этой главе мы увидим, что ответ лежит в русле общих рекомендаций но улучшению условий для распараллеливания: делать все возможное для того, чтобы сократить конкуренцию потоков за общие ячейки памяти.

Но это не так-то просто — впрочем, просто никогда не бывает. Даже если к некоторой ячейке памяти в каждый момент времени обращается только один поток, перебрасывание кэша все равно возможно из-за явления, которое называется ложным разделением (false sharing).

8.2.3. Ложное разделение

Процессорные кэши имеют дело не с отдельными ячейками памяти, а с блоками ячеек, которые называются строками кэша. Обычно размер такого блока составляет 32 или 64 байта, в зависимости от конкретного процессора. Поскольку аппаратный кэш оперирует блоками памяти, небольшие элементы данных, находящиеся в смежных ячейках, часто оказываются в одной строке кэша. Иногда это хорошо: с точки зрения производительности лучше, когда данные, к которым обращается поток, находятся в одной, а не в нескольких строках кэша. Однако, если данные, оказавшиеся в одной строке кэша, логически не связаны между собой и к ним обращаются разные потоки, то возможно возникновение неприятной проблемы.

Допустим, что имеется массив значений типа

int
и множество потоков, каждый из которых обращается к своему элементу массива, в том числе для обновления, причём делает это часто. Поскольку размер
int
обычно меньше строки кэша, то в одной строке окажется несколько значений. Следовательно, хотя каждый поток обращается только к своему элементу, перебрасывание кэша все равно имеет место. Всякий раз, как поток хочет обновить значение в позиции 0, вся строка кэша должна быть передана процессору, на котором этот поток исполняется. И для чего? Только для того, чтобы потом быть переданной процессору, на котором исполняется поток, желающий обновить элемент в позиции 1. Строка кэша разделяется, хотя каждый элемент данных принадлежит только одному потоку. Отсюда и название ложное разделение. Решение заключается в том, чтобы структурировать данные таким образом, чтобы элементы, к которым обращается один поток, находились в памяти рядом (и, следовательно, с большей вероятностью попали в одну строку кэша), а элементы, к которым обращаются разные потоки, отстояли далеко друг от друга и попадали в разные строки кэша. Как это влияет на проектирование кода и данных, вы узнаете ниже.

Если обращение из разных потоков к данным, находящимся в одной строке кэша, — зло, то как влияет на общую картину расположение в памяти данных, к которым обращается один поток?

8.2.4. Насколько близки ваши данные?

Ложное разделение вызвано тем, что данные, нужные одному потоку, расположены в памяти слишком близко к данным другого потока. Но есть и еще одна связанная с размещением данных в памяти проблема, которая влияет на производительность одного потока безотносительно к прочим. Эта локальность данных — если данные, к которым обращается поток, разбросаны по памяти, то велика вероятность, что они находятся в разных строках кэша. И наоборот, если данные потока расположены близко друг к другу, то они с большей вероятностью принадлежат одной строке кэша. Следовательно, когда данные разбросаны, из памяти в кэш процессора приходится загружать больше строк кэша, что увеличивает задержку памяти и снижает общую производительность.

Кроме того, если данные разбросаны, то возрастают шансы на то, что строка кэша, содержащая данные текущего потока, содержит также данные других потоков. В худшем случае может оказаться, что кэш содержит больше ненужных вам данных, чем нужных. Таким образом, впустую растрачивается драгоценная кэш-память и, значит, количество безрезультатных обращений к кэшу увеличивается, и процессору приходится загружать данные из основной памяти, хотя они уже когда-то находились в кэше, но были вытеснены.

Да, но ведь это относится к однопоточным программам, я-то тут при чем? — спросите вы. А все дело в контекстном переключении. Если количество потоков в системе превышает количество ядер, то каждое ядро будет исполнять несколько потоков. Это увеличивает давление на кэш, поскольку мы пытаемся сделать так, чтобы разные потоки обращались к разным строкам кэша — во избежание ложного разделения. Следовательно, при переключении потоков процессором вероятность перезагрузки строк кэша возрастает, если данные каждого потока находятся в разных строках кэша. Если потоков больше, чем ядер или процессоров, то операционная система может назначить потоку один процессор в течение одного кванта времени и другой — в следующем кванте. В результате строки кэша, содержащие данные этого потока, придётся передавать из одного кэша в другой. Чем больше таких передач, тем больше на них уходит времени. Конечно, операционные системы стараются избежать такого развития событий, но иногда это все же происходит и приводит к падению производительности.

Проблемы, связанные с контекстным переключением, возникают особенно часто, когда количество готовых к выполнению потоков намного превышает количество ожидающих. Мы уже говорили об этом феномене, который называется превышением лимита.

8.2.5. Превышение лимита и чрезмерное контекстное переключение

В многопоточных программах количество потоков нередко превышает количество процессоров, если только не используется массивно параллельное оборудование. Однако потоки часто тратят время на ожидание внешнего ввода/вывода, освобождения мьютекса, сигнала условной переменной и т.д., поэтому серьезных проблем не возникает. Наличие избыточных потоков позволяет приложению выполнять полезную работу, а не простаивать, пока потоки чего-то ждут.

Но не всегда это хорошо. Если избыточных потоков слишком много, то даже готовых к выполнению потоков будет больше, чем процессоров, и операционная система будет вынуждена интенсивно переключать потоки, чтобы никого не обделить временными квантами. В главе 1 мы видели, что это приводит к возрастанию накладных расходов на контекстное переключение, а также к проблемам с кэш-памятью из-за локальности. Превышение лимита может возникать, когда задача порождает новые потоки без ограничений, как, например, в алгоритме рекурсивной сортировки из главы 4, или когда количество потоков, естественно возникающих при распределении работы но типам задач, превышает количество процессоров, а рабочая нагрузка носит счетный характер и мало связана с вводом/выводом.

Количество потоков, запускаемых из-за особенностей алгоритма распределения данных, можно ограничить, как показано в разделе 8.1.2. Если же превышение лимита обусловлено естественным распределением работы, то тут ничего не поделаешь, остается разве что выбрать другой способ распределения. Но в таком случае для выбора подходящего распределения может потребоваться больше информации о целевой платформе, чем вы располагаете, поэтому заниматься этим следует лишь тогда, когда производительность неприемлема и можно убедительно продемонстрировать, что изменение способа распределения действительно повышает быстродействие.

Есть и другие факторы, влияющие на производительность многопоточной программы. Например, стоимость перебрасывания кэша может существенно зависеть от того, оснащена ли система двумя одноядерными процессорами или одним двухъдерным, даже если тип и тактовая частота процессоров одинаковы. Однако все основные факторы, эффект которых проявляется наиболее наглядно, были перечислены выше. Теперь рассмотрим, как от них зависит проектирование кода и структур данных.

8.3. Проектирование структур данных для повышения производительности многопоточной программы

В разделе 8.1 мы видели различные способы распределения работы между потоками, а в разделе 8.2 — факторы, от которых может зависеть производительность программы. Как воспользоваться этой информацией при проектировании структур данных для многопоточного кода? Этот вопрос отличается от рассмотренных в главах 6 и 7, где основное внимание было уделено проектированию структур данных, безопасных относительно одновременного доступа. В разделе 2 было показано, что размещение в памяти данных, используемых одним потоком, тоже может иметь значение, даже если эти данные ни с какими другими потоками не разделяются.

Основное, о чем нужно помнить при проектировании структур данных для многопоточной программы, — это конкуренция, ложное разделение и локальность данных. Все три фактора могут оказать большое влияние на производительность, так что нередко добиться улучшения удается, просто изменив размещение данных в памяти или распределение данных между потоками. Начнем с низко висящего плода: распределения элементов массива между потоками.

8.3.1. Распределение элементов массива для сложных операций

Допустим, что программа, выполняющая сложные математические расчеты, должна перемножить две больших квадратных матрицы. Элемент в левом верхнем углу результирующей матрицы получается следующим образом: каждый элемент первой строки левой матрицы умножается на соответственный элемент первого столбца правой матрицы и полученные произведения складываются. Чтобы получить элемент результирующей матрицы, расположенный на пересечении второй строки и первого столбца, эта операция повторяется для второй строки левой матрицы и первого столбца правой матрицы. И так далее. На рис. 8.3 показано, что элемент результирующей матрицы на пересечении второй строки и третьего столбца получается суммированием попарных произведений элементов второй строки левой матрицы и третьего столбца правой.

Рис. 8.3. Умножение матриц

Теперь предположим, что матрицы содержат по несколько тысяч строк и столбцов, иначе заводить несколько потоков для оптимизации умножения не имеет смысла. Обычно, если матрица не разрежена, то она представляется большим массивом в памяти, в котором сначала идут все элементы первой строки, потом все элементы второй строки и так далее. Следовательно, для перемножения матриц нам понадобятся три огромных массива. Чтобы добиться оптимальной производительности, мы должны внимательно следить за порядком доступа к данным, а особенно за операциями записи в третий массив.

Существует много способов распределить эту работу между потоками. В предположении, что строк и столбцов больше, чем имеется процессоров, можно поручить каждому потоку вычисление нескольких столбцов или строк результирующей матрицы или даже вычисление какой-то ее прямоугольной части.

В разделах 8.2.3 и 8.2.4 мы видели, что для улучшения использования кэша и предотвращения ложного разделения лучше, когда поток обращается к данным, находящимся в соседних ячейках, а не разбросанным но всей памяти. Если поток вычисляет множество столбцов, то читать ему придётся все значения из первой матрицы и соответственных столбцов второй матрицы, но писать только в назначенные ему столбцы результирующей матрицы. При том, что матрицы хранятся но строкам, это означает, что мы будем обращаться к N элементам первой строки результирующей матрицы, N элементам второй строки и т.д. (N — количество обрабатываемых столбцов). Другие потоки будут обращаться к другим элементам строк, поэтому, чтобы минимизировать вероятность ложного разделения, столбцы, вычисляемые каждым потоком, должны быть соседними, тогда будут соседними и N записываемых элементов в каждой строке. Разумеется, если эти N элементов занимают в точности целое число строк кэша, то ложного разделения вообще не будет, потому что каждый поток работает со своим набором строк кэша.

С другой стороны, если каждый поток вычисляет множество строк, то он должен будет прочитать все значения правой матрицы и значения из соответственных строк левой матрицы, но в результирующую матрицу записывать будет только строки. Поскольку матрицы хранятся но строкам, то поток будет обращаться ко всем элементам N строк. Если поручить потоку вычисление соседних строк, то он окажется единственным потоком, который пишет в эти строки, то есть будет владельцем непрерывного участка памяти, к которому больше никакие потоки не обращаются. Вероятно, это лучше, чем вычисление множества столбцов, потому что ложное разделение, если и может возникнуть, то только для нескольких последних элементов предыдущего и нескольких первых элементов следующего участка. Однако это предположение нуждается в подтверждении путем прямых измерений на целевой платформе.

А как насчет третьего варианта — вычисления прямоугольных блоков матрицы? Можно рассматривать его как комбинацию распределения но столбцам и по строкам. Поэтому шансы на ложное разделение такие же, как при распределении но столбцам. Если выбрать число столбцов так, чтобы минимизировать эту вероятность, то у распределения по блокам будет преимущество в части чтения — исходную матрицу придётся читать не целиком, а только те столбцы и строки, которые соответствуют назначенному потоку прямоугольному блоку. Рассмотрим конкретный пример умножения двух матриц размерностью 1000×1000. Всего в каждой матрице миллион элементов. При наличии 100 процессоров каждый из них сможет вычислить 10 строк, то есть 10 000 элементов. Однако для их вычисления процессору придётся прочитать всю правую матрицу (миллион элементов) плюс 10 000 элементов из соответственных строк левой матрицы, итого — 1 010 000 элементов. С другой стороны, если каждый процессор вычисляет блок 100×100 (те же 10 000 элементов), то нужно будет прочитать значения из 100 строк левой матрицы (100×1000 = 100 000 элементов) и 100 столбцов правой матрицы (еще 100 000). В итоге мы получаем только 200 000 прочитанных элементов, то есть в пять раз меньше по сравнению с первым случаем. Если читается меньше элементов, то сокращается вероятность отсутствия нужного значения в кэше, а, значит, производительность потенциально возрастает.

Поэтому лучше разбить результирующую матрицу на небольшие квадратные или почти квадратные блоки, а не вычислять строки целиком. Конечно, размер блока можно определять на этапе выполнения в зависимости от размерности матриц и числа имеющихся процессоров. Как обычно, если производительность существенна, то важно профилировать разные варианты решения на целевой архитектуре.

Но если вы не занимаетесь умножением матриц, то какую пользу можете извлечь из этого обсуждения? Да просто те же принципы применимы в любой ситуации, где нужно назначать потокам вычисление больших блоков данных. Тщательно изучите все аспекты доступа к данным и выявите потенциальные причины снижения производительности. Не исключено, что ваша задача обладает схожими характеристиками, позволяющими улучшить быстродействие всего лишь за счет изменения способа распределения работы без модификации основного алгоритма.

Итак, мы посмотрели, как порядок доступа к элементам массива может отразиться на производительности. А что можно сказать о других структурах данных?

8.3.2. Порядок доступа к другим структурам данных

По существу, к оптимизации доступа к другим структурам данных применимы те же принципы, что и для массивов.

• Попытайтесь выбрать распределение данных между потоками, так чтобы данные, расположенные по соседству, обрабатывались одним потоком.

• Попытайтесь минимизировать объем данных, к которым обращается каждый поток.

• Попытайтесь сделать так, чтобы данные, к которым обращаются разные потоки, находились достаточно далеко друг от друга, чтобы избежать ложного разделения.

Разумеется, к другим структурам данных применить эти принципы не так просто. Например, в двоичном дереве очень трудно выделить части, которые сами не являлись бы деревьями, а полезно это или нет, зависит от того, насколько дерево сбалансировано и на сколько частей его нужно разбить. К тому же, память для узлов деревьев по необходимости выделяется динамически, так что оказывается в разных частях кучи.

Само по себе то, что данные находятся в разных частях кучи, не страшно, но означает, что процессору придётся держать в кэше ячейки из разных участков памяти. На самом деле, это может быть даже хорошо. Если несколько потоков обходят дерево, то всем им нужно получать доступ к узлам. Однако если узлы содержат только указатели на реальные данные, то процессор должен будет загружать данные только по мере необходимости. Если данные модифицируются потоками, то за счет этого, возможно, удастся предотвратить падение производительности из-за ложного разделения между данными самого узла и данными, образующими структуру дерева.

Схожая проблема возникает для данных, защищенных мьютексом. Предположим, что имеется простой класс, содержащий какие-то элементы данных и защищающий их мьютекс. Для потока, захватывающего мьютекс, было бы идеально, чтобы мьютекс и данные были размещены в памяти рядом. Тогда необходимые ему данные уже находятся в кэше процессора, потому что были загружены вместе с мьютексом, когда поток модифицировал его для захвата. Но есть и оборотная сторона медали: другие потоки, пытающиеся захватить мьютекс, удерживаемый первым потоком, должны будут обратиться к той же памяти. Захват мьютекса обычно реализуется в виде атомарной операции чтения-модификации-записи ячейки памяти, принадлежащей мьютексу, с последующим вызовом ядра ОС, если мьютекс уже захвачен. Операция чтения-модификации-записи вполне может сделать недействительными хранящиеся в кэше данные. С точки зрения мьютекса, это несущественно, так как первый поток все равно не стал бы его трогать, пока не подойдёт время освобождения. Но если мьютекс находится в той же строке кэша, что и данные, которыми оперирует захвативший его поток, то получится, что производительность потока, владеющего мьютексом, надает только потому, что другой поток попытался захватить тот же мьютекс.

Один из способов проверить, приводит ли такого рода ложное разделение к проблемам, — добавить большие разделительные блоки фиктивных данных между данными, к которым одновременно обращаются разные потоки. Например, следующая структура:

struct protected_data {│
65536 на несколько

 std::mutex m;     │
порядков больше, чем

 char padding[65536]; ←┘
длина строки кэша

 my_data data_to_protect;

};

удобна для проверки конкуренции за мьютекс, а структура

struct my_data {

 data_item1 d1;

 data_item2 d2;

 char padding[65536];

};

my_data some_array[256];

— для проверки ложного разделения данных массива. Если в результате производительность повысится, значит, ложное разделение составляет проблему, и тогда можно либо оставить заполнитель, либо устранить ложное разделение, по-другому организовав доступ к данным.

Разумеется, порядок доступа к данным — не единственное, что нужно принимать во внимание при проектировании параллельных программ. Рассмотрим некоторые другие аспекты.

8.4. Дополнительные соображения при проектировании параллельных программ

До сих пор мы в этой главе рассматривали различные способы распределения работы между потоками, факторы, влияющие на производительность, и то, как от них зависит выбор порядка доступа к данным и самой структуры данных. Но этим проблематика проектирования параллельных программ не исчерпывается. Необходимо еще принимать во внимание такие вещи, как безопасность относительно исключений и масштабируемость. Говорят, что программа масштабируется, если ее производительность (в терминах повышения быстродействия или увеличения пропускной способности) возрастает при добавлении новых процессорных ядер. В идеале производительность должна расти линейно, то есть система с 100 процессорами должна работать в 100 раз быстрее системы с одним процессором.

Даже немасштабируемая программа может быть работоспособной — в конце концов, однопоточные приложения в этом смысле, безусловно, не масштабируемы — но вот безопасность относительно исключений — вопрос, напрямую связанный с корректностью. Если программа не безопасна относительно исключений, то может наблюдаться нарушение инвариантов, состояния гонки и даже аварийное завершение. Вот этим вопросом мы сейчас и займемся.

8.4.1. Безопасность относительно исключений в параллельных алгоритмах

Безопасность относительно исключений — необходимая составная часть любой приличной программы на С++, и параллельные программы — не исключение. На самом деле, при разработке параллельных алгоритмов часто требуется уделять исключениям даже больше внимания. Если какая-то операция в последовательном алгоритме возбуждает исключение, то алгоритм должен лишь позаботиться о предотвращении утечек памяти и нарушения собственных инвариантов, а потом может передать исключение вызывающей программе для обработки. В параллельных же алгоритмах многие операции выполняются в разных потоках. В этом случае исключение невозможно распространить вверх по стеку вызовов, потому что у каждого потока свой стек. Если выход из функции потока производится в результате исключения, то приложение завершается.

В качестве конкретного примера рассмотрим еще раз функцию

parallel_accumulate
из листинга 2.8, который воспроизведен ниже.


Листинг 8.2. Наивная параллельная организация

std::accumulate
(из листинга 2.8)

template

struct accumulate_block {

 void operator()(Iterator first, Iterator last, T& result) {

  result = std::accumulate(first, last, result); ←
(1)

 }

};


template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);←
(2)

 if (!length)

  return init;


 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;


 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();


 unsigned long const num_threads =

  std::min(

  hardware_threads != 0 ? hardware_threads : 2, max_threads);


 unsigned long const block_size = length / num_threads;


 std: :vector results (num_threads); ←
(3)

 std::vector threads(num_threads - 1); ←
(4)


 Iterator block_start = first; ←
(5)

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start; ←
(6)

  std::advance(block_end, block_size);

  threads[i] = std::thread( ←
(7)

  accumulate_block(),

  block_start, block_end, std::ref(results[i]));

  block_start = block_end; ←
(8)

 }


 accumulate_block()(

  block_start, last, results[num_threads - 1]);←
(9)


 std::for_each(threads.begin(), threads.end(),

 std::mem_fn(&std::thread::join));

 return

  std::accumulate(results.begin(), results.end(), init); ←
(10)

}

Посмотрим, где могут возникнуть исключения. Вообще говоря, это вызовы библиотечных функций, которые могут возбуждать исключения, а также операции, определенные в пользовательском типе.

Итак, начнем. В точке (2) мы обращаемся к функции

distance
, которая выполняет операции над пользовательским типом итератора. Поскольку мы еще не начали работу, и обращение к этой функции произведено из вызывающего потока, то тут всё нормально. Далее мы выделяем память для векторов
results
(3) и
threads
(4). И эти обращения произведены из вызывающего потока до начала работы и до создания новых потоков, так что и здесь всё хорошо. Разумеется, если конструктор
threads
возбудит исключение, то нужно будет освободить память, выделенную для
results
, но об этом позаботится деструктор.

С инициализацией объекта

block_start
(5) всё чисто по тем же причинам, так что перейдём к операциям в цикле запуска потоков (6), (7), (8). Если после создания первого же потока (7) возникнет исключение, и мы его не перехватим, появится проблема; деструкторы объектов
std::thread
вызывают
std::terminate
, что приводит к аварийному завершению программы. Нехорошо.

Обращение к

accumulate_block
в точке (9) может возбуждать исключения — с точно такими же последствиями: объекты потоков будут уничтожены, а их деструкторы вызовут
std::terminate
. С другой стороны, исключение, возбуждаемое в последнем обращении к
std::accumulate
(10), не так опасно, потому что к этому моменту все потоки уже присоединились к вызывающему.

Таким образом, обращения к

accumulate_block
из новых потоков могут возбуждать исключения в точке (1). Так как блоки
catch
отсутствуют, то исключение останется необработанным и приведёт к вызову
std::terminate()
и завершению программы.

Если это еще не очевидно, скажем прямо: этот код не безопасен относительно исключений.

Делаем код безопасным относительно исключений

Итак, мы выявили все возможные точки возбуждения исключений и поняли, к каким печальным последствиям это приведёт. Что с этим можно сделать? Начнем с вопроса об исключениях, возбуждаемых в созданных нами потоках.

В главе 4 мы уже познакомились с подходящим для решения проблемы средством. Для чего нам вообще нужны новые потоки? Для того чтобы вычислить результат и при этом учесть возможность возникновения исключений. Но это именно то, для чего предназначено сочетание

std::packaged_task
и
std::future
. В листинге 8.3 показан код, переписанный с использованием
std::packaged_task
.


Листинг 8.3. Параллельная версия

std::accumulate
с применением
std::packaged_task

template

struct accumulate_block {

 T operator()(Iterator first, Iterator last) {←
(1)

  return std::accumulate(first, last, T());  ←
(2)

 }

};


template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);


 if (!length)

  return init;


 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread — 1) / min_per_thread;


 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();


 unsigned long const num_threads =

  std::min(

  hardware_threads i = 0 ? hardware_threads : 2, max_threads);


 unsigned long const block_size = length / num_threads;


 std::vector > futures(num_threads-1);←
(3)

 std::vector threads(num_threads — 1);


 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  std::packaged_task task( ←
(4)

  accumulate_block());

  futures[i] = task.get_future(); ←
(5)

  threads[i] =

  std::thread(std::move(task), block_start, block_end);←
(6)

  block_start = block_end;

 }

 T last_result = accumulate_block()(block_start, last); ←
(7)


 std::for_each(threads.begin(), threads.end(),

  std::mem_fn(&std::thread::join));


 T result = init; ←
(8)

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  result += futures[i].get(); ←
(9)

 }

 result += last_result; ←
(10)

 return result;

}

Первое изменение заключается в том, что оператор вызова в

accumulate_block
теперь возвращает результат по значению, а не принимает ссылку на место, где его требуется сохранить (1). Для обеспечения безопасности относительно исключений мы используем
std::packaged_task
и
std::future
, поэтому можем воспользоваться этим и для передачи результата. Правда, для этого требуется при вызове
std::accumulate
(2) явно передавать сконструированный по умолчанию экземпляр
T
, а не использовать повторно предоставленное значение
result
, но это не слишком существенное изменение.

Далее, вместо того заводить вектор результатов, мы создаем вектор

futures
(3), в котором храним объекты
std::future
для каждого запущенного потока. В цикле запуска потоков мы сначала создаем задачу для
accumulate_block
(4). В классе
std::packaged_task
объявлена задача, которая принимает два объекта
Iterator
и возвращает
T
, а это именно то, что делает наша функция. Затем мы получаем будущий результат для этой задачи (5) и исполняем ее в новом потоке, передавая начало и конец обрабатываемого блока (6). Результат работы задачи, равно как и исключение, если оно возникнет, запоминается в объекте
future
.

Поскольку используются будущие результаты, массива

results
больше нет, поэтому мы должны сохранить результат обработки последнего блока в переменной (7), а не в элементе массива. Кроме того, поскольку мы получаем значения из будущих результатов, проще не вызывать
std::accumulate
, а написать простой цикл
for
, в котором к переданному начальному значению (8) будут прибавляться значения, полученные из каждого будущего результата (9). Если какая-то задача возбудит исключение, то оно будет запомнено в будущем результате и повторно возбуждено при обращении к
get()
. Наконец, перед тем как возвращать окончательный результат вызывающей программе, мы прибавляем результат обработки последнего блока (10).

Таким образом, мы устранили одну из потенциальных проблем: исключения, возбужденные в рабочих потоках, повторно возбуждаются в главном. Если исключения возникнут в нескольких рабочих потоках, то вверх распространится только одно, но это не очень страшно. Если вы считаете, что это все-таки важно, то можете воспользоваться чем-то вроде класса

std::nested_exception
, чтобы собрать все такие исключения и передать их главному потоку.

Осталось решить проблему утечки потоков в случае, когда исключение возникает между моментом запуска первого потока и присоединением всех запущенных. Для этого проще всего перехватить любое исключение, дождаться присоединения потоков, которые все еще находятся в состоянии

joinable()
, а потом возбудить исключение повторно:

try {

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  // ... как и раньше

 }

 T last_result = accumulate_block()(block_start, last);

 std::for_each(threads.begin(), threads.end(),

 std::mem_fn(&std::thread::join));

} catch (...) {

 for (unsigned long i = 0; i < (num_thread - 1); ++i) {

 if (threads[i].joinable())

  thread[i].join();

 }

 throw;

}

Теперь все работает. Все потоки будут присоединены вне зависимости от того, как завершилась обработка блока. Однако блоки

try-catch
выглядят некрасиво, и часть кода дублируется. Мы присоединяем потоки как в «нормальной» ветке, так и в блоке
catch
. Дублирование кода — вещь почти всегда нежелательная, потому что изменения придётся вносить в несколько мест. Давайте лучше перенесём этот код в деструктор — ведь именно такова идиома очистки ресурсов в С++. Вот как выглядит этот класс:

class join_threads {

 std::vector& threads;

public:

 explicit join_threads(std::vector& threads_):

  threads(threads_) {}


 ~join_threads() {

  for (unsigned long i = 0; i < threads.size(); ++i) {

  if (threads[i].joinable())

   threads[i].join();

  }

 }

};

Это похоже на класс

thread_guard
из листинга 2.3, только на этот раз мы «охраняем» целый вектор потоков. Теперь наш код упрощается.


Листинг 8.4. Безопасная относительно исключений версия

std::accumulate

template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);


 if (!length)

  return init;


 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;


 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();


 unsigned long const num_threads =

  std::min(

  hardware_threads i = 0 ? hardware_threads : 2, max_threads);


 unsigned long const block_size = length / num_threads;


 std::vector > futures(num_threads — 1);

 std::vector threads(num_threads - 1);

 join_threads joiner(threads); ←
(1)


 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  std::packaged_task task(

  accumulate_block());

  futures[i] = task.get_future();

  threads[i] =

  std::thread(std:move(task), block_start, block_end);

  block_start = block_end;

 }

 T last_result = accumulate_block()(block_start, last);


 T result = init;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  result + = futures[i].get(); ←
(2)

 }

 result += last_result;

 return result;

}

Создав контейнер потоков, мы создаем объект написанного выше класса (1), который присоединяет все завершившиеся потоки. Теперь явный цикл присоединения можно удалить, точно зная, что при выходе из функции потоки будут присоединены. Отметим, что вызовы

futures[i].get()
(2) приостанавливают выполнение программы до готовности результатов, поэтому в этой точке явно присоединять потоки не нужно. Этим данный код отличается от первоначальной версии в листинге 8.2, где присоединять потоки было необходимо для нрав ильного заполнения вектора
results
. Мало того что мы получили безопасный относительно исключений код, так еще и функция стала короче, потому что код присоединения вынесен в новый класс (который, кстати, можно использовать и в других программах).

Обеспечение безопасности относительно исключений при работе с
std::async()

Теперь, когда мы знаем, что необходимо для обеспечения безопасности относительно исключений в программе, которая явно управляет потоками, посмотрим, как добиться того же результата при использовании

std::async()
. Мы уже видели, что в этом случае управление потоками берет на себя библиотека, а запущенный ей поток завершается, когда будущий результат готов. В плане безопасности относительно исключений важно то, что если уничтожить будущий результат, не дождавшись его, то деструктор будет ждать завершения потока. Тем самым мы элегантно решаем проблему утечки потоков, которые еще исполняются и хранят ссылки на данные. В следующем листинге показана безопасная относительно исключений реализация с применением
std::async()
.


Листинг 8.5. Безопасная относительно исключений версия

std::accumulate
с применением
std::async

template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);←
(1)

 unsigned long const max_chunk_size = 25;

 if (length <= max_chunk_size) {

  return std::accumulate(first, last, init); ←
(2)

 } else {

  Iterator mid_point = first;

  std::advance(mid_point, length / 2); ←
(3)

  std::future first_half_result =

  std::async(parallel_accumulate, ←
(4)

  first, mid_point, init);

  T second_half_result =

  parallel_accumulate(mid_point, last, T());     ←
(5)

  return first_half_result.get() + second_half_result;←
(6)

 }

}

В этой версии мы распределяем данные рекурсивно, а не вычисляем распределение по блокам заранее, но в целом код намного проще предыдущей версии и при этом безопасен относительно исключений. Как и раньше, сначала мы вычисляем длину последовательности (1), и, если она меньше максимального размера блока, то вызываем

std::accumulate
напрямую (2). В противном случае находим среднюю точку последовательности (3) и запускаем асинхронную задачу, которая будет обрабатывать левую половину (4). Для обработки правой половины мы вызываем себя рекурсивно (5), а затем складываем результаты обработки обеих половин (6). Библиотека гарантирует, что
std::async
задействует имеющийся аппаратный параллелизм, не создавая слишком большого количества потоков. Некоторые «асинхронные» вызовы на самом деле будут исполняться синхронно при обращении к
get()
(6).

Изящество этого решения не только в том, что задействуется аппаратный параллелизм, но и в том, что безопасность относительно исключений обеспечивается тривиальным образом. Если рекурсивный вызов (5) возбудит исключение, то будущий результат, созданный при обращении к

std::async
(4), будет уничтожен при распространении исключения вверх по стеку. Это в свою очередь приведёт к ожиданию завершения асинхронного потока, так что «висячий поток» не образуется. С другой стороны, если исключение возбуждает асинхронный вызов, то оно будет запомнено в будущем результате и повторно возбуждено при обращении к
get()
(6).

Какие еще соображения следует принимать во внимание при проектировании параллельного кода? Давайте поговорим о масштабируемости. Насколько увеличится производительность программы, если запустить ее на машине с большим количеством процессоров?

8.4.2. Масштабируемость и закон Амдала

Под масштабируемостью понимается свойство программы использовать дополнительные имеющиеся в системе процессоры. На одном полюсе находятся однопоточные приложения, которые вообще не масштабируемы, — даже если система оснащена 100 процессорами, быстродействие такой программы не возрастет. На другом полюсе мы встречаем проект SETI@Home[19], который рассчитан на использование тысяч процессоров (в виде отдельных компьютеров, добровольно подключаемых к сети пользователями).

В многопоточной программе количество потоков, выполняющих полезную работу, может изменяться в процессе исполнения. Даже если каждый поток на всем протяжении своего существования делает что-то полезное, первоначально в приложении имеется всего один поток, который должен запустить все остальные. Но такой сценарий крайне маловероятен. Потоки часто не работают, а ожидают друг друга или завершения операций ввода/вывода.

Всякий раз, как один поток чего-то ждет (неважно, чего именно), а никакого другого потока, готового занять вместо него процессор, нет, процессор, который мог бы выполнять полезную работу, простаивает.

Упрощенно можно представлять, что программа состоит из «последовательных» участков, в которых полезные действия выполняет только один поток, и «параллельных», где задействованы все имеющиеся процессоры. Если программа исполняется на машине с большим числом процессоров, то теоретически «параллельные» участки могли бы завершаться быстрее, а «последовательные» такими бы и остались. Приняв такие упрощающие предположения, можно оценить потенциальное повышение производительности при увеличении количества процессоров: если доля «последовательных» участков равна fs, то коэффициент повышения производительности P при числе процессоров N составляет


Это закон Амдала, на который часто ссылаются при обсуждении производительности параллельных программ. Если код полностью распараллелен, то есть доля последовательных участков нулевая, то коэффициент ускорения равен N. Если же, к примеру, последовательные участки составляют треть программы, то даже при бесконечном количестве процессоров не удастся добиться ускорения более чем в три раза.

Конечно, эта картина чересчур упрощённая, потому что редко встречаются бесконечно делимые задачи, без чего это соотношение неверно, и не менее редко вся работа сводится только к процессорным вычислениям, как то предполагается в законе Амдала. Во время исполнения потоки могут ожидать разных событий.

Но из закона Амдала все же следует, что если целью распараллеливания является повышение производительности, то следует проектировать всё приложение так, чтобы процессорам всегда было чем заняться. За счет уменьшения длины «последовательных» участков или времени ожидания можно повысить выигрыш от добавления новых процессоров. Альтернативный подход — подать на вход системы больше данных и тем самым загрузить параллельные участки работой; при этом можно будет уменьшить долю последовательных участков и повысить коэффициент P.

По существу, масштабируемость — это возможность либо уменьшить время, затрачиваемое на какое-то действие, либо увеличить объем данных, обрабатываемых в единицу времени, при увеличении количества процессоров. Иногда оба свойства эквивалентны (можно обработать больше данных, если каждый элемент обрабатывается быстрее), но это необязательно. Прежде чем выбирать способ распределения работы между потоками, важно определить, какие аспекты масштабируемости представляют для вас наибольший интерес.

В начале этого раздела я уже говорил, что у потоков не всегда есть чем заняться. Иногда они вынуждены ждать другие потоки, завершения ввода/вывода или еще чего-то. Если на время этого ожидания загрузить систему какой-нибудь полезной работой, такое простаивание можно «скрыть».

8.4.3. Сокрытие латентности с помощью нескольких потоков

При обсуждении производительности многопоточного кода мы часто предполагали, что потоки трудятся изо всех сил и, получая в свое распоряжение процессор, всегда имеют полезную работу. Конечно, это не так — потоки часто оказываются блокированы в ожидании какого-то события: завершения ввода/вывода, освобождения мьютекса, завершения операции в каком-то другом потоке, сигнала условной переменной, готовности будущего результата… Наконец, они могут просто спать какое-то время.

Но какова бы ни была причина ожидания, если потоков столько же, сколько физических процессоров, наличие заблокированных потоков означает, что процессоры работают вхолостую. Процессор, который мог бы исполнять поток, вместо этого не делает ничего. Следовательно, если заранее известно, что какой-то поток будет проводить много времени в ожидании, то имеет смысл задействовать на это время процессор, запустив один или несколько дополнительных потоков.

Возьмем, к примеру, антивирусный сканер, который для распределения работы использует конвейер. Первый поток просматривает файловую систему и помещает имена файлов в очередь. Второй поток выбирает имена файлов из очереди и сканирует их на предмет наличия вирусов. Мы знаем, что поток просмотра файловой системы определённо будет простаивать в ожидании завершения ввода/вывода, поэтому «лишнее» процессорное время отдаем дополнительному потоку сканирования. Таким образом, у нас будет поток выбора файлов и столько потоков сканирования, сколько имеется процессоров. Поскольку потоку сканирования тоже нужно читать большие куски файлов, то имеет смысл еще увеличить количество таких потоков. Однако в какой-то момент потоков может стать слишком много, и система начнет работать медленнее, потому что вынуждена будет расходовать все больше и больше времени на контекстное переключение (см. раздел 8.2.5).

Такого рода настройка является оптимизацией, поэтому следует измерять производительность до и после изменения количества потоков; оптимальное их число зависит как от характера работы, так и от доли времени, затрачиваемой потоком на ожидание.

Иногда удается полезно использовать свободное процессорное время, не запуская дополнительные потоки. Например, если поток блокируется в ожидании завершения ввода/вывода, то имеет смысл воспользоваться асинхронным вводом/выводом, если платформа его поддерживает. Тогда поток сможет заняться полезной работой, пока ввод/вывод выполняется в фоне. С другой стороны, поток, ожидающий завершения операции в другом потоке, может в это время заняться чем-то полезным. Как это делается, мы видели при рассмотрении реализации свободной от блокировок очереди в главе 7. В крайнем случае, когда поток ждет завершения задачи, которая еще не была запущена другим потоком, ожидающий поток может сам выполнить эту задачу целиком или помочь в выполнении какой-то другой задачи. Такой пример мы видели в листинге 8.1, где функция сортировки пыталась отсортировать находящиеся в очереди блоки, пока блоки, которых она ждет, сортируются другими потоками.

Иногда потоки добавляются не для того, чтобы загрузить имеющиеся процессоры, а чтобы быстрее обрабатывать внешние события, то есть повысить быстроту реакции системы.

8.4.4. Повышение быстроты реакции за счет распараллеливания

Большинство современных графических интерфейсов пользователя являются событийно-управляемыми — пользователь выполняет в интерфейсе какие-то действия — нажимает клавиши или двигает мышь, в результате чего порождается последовательность событий или сообщений, которые приложение затем обрабатывает. Система может и сама порождать сообщения или события. Чтобы все события и сообщения были корректно обработаны, в приложении обычно присутствует цикл такого вида:

while (true) {

 event_data event = get_event();

 if (event.type == quit)

  break;

 process(event);

}

Детали API, конечно, могут отличаться, но структура всегда одна и та же: дождаться события, обработать его и ждать следующего. В однопоточном приложении такая структура затрудняет программирование длительных задач (см. раздел 8.1.3). Чтобы система оперативно реагировала на действия пользователя, функции

get_event()
и
process()
должны вызываться достаточно часто вне зависимости от того, чем занято приложение. Это означает, что задача должна либо периодически приостанавливать себя и возвращать управление циклу обработки событий, либо сама вызывать функции
get_event()
и
process()
в подходящих точках. То и другое решение усложняет реализацию задачи.

Применив распараллеливание для разделения обязанностей, мы сможем вынести длительную задачу в отдельный поток, а выделенному потоку GUI поручить обработку событий. В дальнейшем потоки могут взаимодействовать с помощью простых механизмов, и мешать код обработки событий с кодом задачи не придётся. В листинге ниже приведён набросок такого разделения обязанностей.


Листинг 8.6. Отделение потока GUI от потока задачи

std::thread task_thread;

std::atomic task_cancelled(false);


void gui_thread() {

 while (true) {

  event_data event = get_event();

  if (event.type == quit)

  break;

  process(event);

 }

}


void task() {

 while (!task_complete() && !task_cancelled) {

  do_next_operation();

 }

 if (task_cancelled) {

  perform_cleanup();

 } else {

  post_gui_event(task_complete);

 }

}


void process(event_data const& event) {

 switch(event.type) {

 case start_task:

  task_cancelled = false;

  task_thread = std::thread(task);

  break;

 case stop_task:

  task_cancelled = true;

  task_thread.join();

  break;

 case task_complete:

  task_thread.join();

  display_results();

  break;

 default:

  //...

 }

}

В результате такого разделения обязанностей поток пользовательского интерфейса всегда будет своевременно реагировать на события, даже если задача занимает много времени. Быстрота реакции часто является основной характеристикой приложения с точки зрения пользователя — с приложением, которое полностью зависает на время выполнения некоторой операции (неважно, какой именно), работать неприятно. За счет выделения специального потока для обработки событий пользовательский интерфейс может сам обрабатывать относящиеся к нему сообщения (например, изменение размера или перерисовка окна), не прерывая длительной операции, но передавая адресованные ей сообщения, если таковые поступают.

До сих пор в этой главе мы говорили о том, что следует учитывать при проектировании параллельного кода. Поначалу количество разных факторов может привести в изумление, но постепенно они войдут в плоть и кровь и станут вашей второй натурой. Если описанные выше соображения внове для вас, то, надеюсь, они станут яснее после того, как мы рассмотрим конкретные примеры многопоточного кода.

8.5. Проектирование параллельного кода на практике

В какой мере следует учитывать описанные выше факторы при проектировании, зависит от конкретной задачи. Для демонстрации мы рассмотрим реализацию параллельных версий трех функций из стандартной библиотеки С++. При этом у нас будет знакомая платформа, на которой можно изучать новые вещи. Попутно мы получим работоспособные версии функций, которые можно будет применить при распараллеливании более крупной программы.

Я ставил себе задачей продемонстрировать определенные приёмы, а не написать самый оптимальный код. Реализации, в которых лучше используется имеющееся оборудование, можно найти в академической литературе по параллельным алгоритмам или в специализированных многопоточных библиотеках типа Intel Threading Building Blocks[20].

Концептуально простейшим параллельным алгоритмом является параллельная версия

std::for_each
, с которой я и начну.

8.5.1. Параллельная реализация
std::for_each

Идея

std::for_each
проста — этот алгоритм вызывает предоставленную пользователем функцию для каждого элемента диапазона. Различие между параллельной и последовательной реализацией
std::for_each
заключается, прежде всего, в порядке вызовов функции. Стандартная версия
std::for_each
вызывает функцию сначала для первого элемента диапазона, затем для второго и так далее, тогда как параллельная версия не дает гарантий относительно порядка обработки элементов, они даже могут (и хочется надеяться, будут) обрабатываться параллельно.

Для реализации параллельной версии нужно всего лишь разбить диапазон на участки, которые будут обрабатываться каждым потоком. Количество элементов известно заранее, поэтому такое разбиение можно произвести до начала работы (см. раздел 8.1.1). Мы будем предполагать, что это единственная исполняемая параллельная задача, поэтому вычислить количество требуемых потоков можно с помощью функции

std::thread::hardware_concurrency()
. Мы также знаем, что элементы можно обрабатывать абсолютно независимо, поэтому для предотвращения ложного разделения (см. раздел 8.2.3) имеет смысл использовать соседние блоки.

По своей структуре этот алгоритм похож на параллельную версию

std::accumulate
, описанную в разделе 8.4.1, только вместо вычисления суммы элементов он применяет к ним заданную функцию. На первый взгляд, это должно бы существенно упростить код, потому что не нужно возвращать никакой результат. Но если мы собираемся передавать исключения вызывающей программе, то все равно придется воспользоваться механизмами
std::packaged_task
и
std::future
, чтобы передавать исключения из одного потока в другой. Ниже приведен пример реализации.


Листинг 8.7. Параллельная реализация

std::for_each

template

void parallel_for_each(Iterator first, Iterator last, Func f) {

 unsigned long const length = std::distance(first, last);


 if (!length)

  return;


 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;


 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();


 unsigned long const num_threads =

  std::min(

  hardware_threads != 0 ? hardware_threads : 2, max_threads);


 unsigned long const block_size = length / num_threads;


 std::vector > futures(num_threads - 1); ←
(1)

 std::vector threads(num_threads – 1);

 join_threads joiner(threads);


 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  std::packaged_task task( ←
(2)

  [=]() {

   std::for_each(block_start, block_end, f);

  });

  futures[i] = task.get_future();

  threads[i] = std::thread(std::move(task)); ←
(3)

  block_start = block_end;

 }

 std::for_each(block_start, last, f);

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  futures[i].get(); ←
(4)

 }

}

Структурно код ничем не отличается от приведенного в листинге 8.4, что и неудивительно. Основное различие состоит в том, что в векторе futures хранятся объекты

std::future
(1), потому что рабочие потоки не возвращают значение, а в качестве задачи мы используем простую лямбда-функцию, которая вызывает функцию
f
для элементов из диапазона от
block_start
до
block_end
(2). Это позволяет не передавать конструктору потока (3) диапазон. Поскольку рабочие потоки ничего не возвращают, обращения к
futures[i].get()
(4) служат только для получения исключений, возникших в рабочих потоках; если мы не хотим передавать исключения, то эти обращения можно вообще опустить.

Реализацию

parallel_for_each
можно упростить, воспользовавшись
std::async
, — точно так же, как мы делали при распараллеливании
std::accumulate
.


Листинг 8.8. Параллельная реализация

std::for_each
с применением
std::async

template

void parallel_for_each(Iterator first, Iterator last, Func f) {

 unsigned long const length = std::distance(first, last);


 if (!length)

  return;


 unsigned long const min_per_thread = 25;


 if (length < (2 * min_per_thread)) {

  std::for_each(first, last, f); ←
(1)

 } else {

  Iterator const mid_point = first + length / 2;

  std::future first_half = ←
(2)

  std::async(¶llel_for_each,

  first, mid_point, f);

  parallel_for_each(mid_point, last, f); ←
(3)

  first_half.get();            ←
(4)

 }

}

Как и в случае реализации

parallel_accumulate
с помощью
std::async
в листинге 8.5, мы разбиваем данные рекурсивно в процессе выполнения, а не заранее, потому что не знаем, сколько потоков задействует библиотека. На каждом шаге данные делятся пополам, пока их не останется слишком мало для дальнейшего деления. При этом одна половина обрабатывается асинхронно (2), а вторая — непосредственно (3). Когда дальнейшее деление становится нецелесообразным, вызывается
std::for_each
(1). И снова использование
std::async
и функции-члена
get()
объекта
std::future
(4) обеспечивает семантику распространения исключения.

Теперь перейдем от алгоритмов, которые выполняют одну и ту же операцию над каждым элементом (к их числу относятся также

std::count
и
std::replace
), к чуть более сложному случаю —
std::find
.

8.5.2. Параллельная реализация
std::find

Далее будет полезно рассмотреть алгоритм

std::find
, потому что это один из нескольких алгоритмов, которые могут завершаться еще до того, как обработаны все элементы. Например, если уже первый элемент в диапазоне отвечает условию поиска, то рассматривать остальные не имеет смысла. Как мы скоро увидим, это свойство существенно для повышения производительности, и от него напрямую зависит структура параллельной реализации. На этом примере мы продемонстрируем, как порядок доступа к данным может оказать влияние на проектирование программы (см. раздел 8.3.2). К той же категории относятся алгоритмы
std::equal
и
std::any_of
.

Если вы вместе с женой или другом ищете какую-нибудь старую фотографию в сваленных на чердаке альбомах, то вряд ли захотите, чтобы они продолжали перелистывать страницы, когда вы уже нашли то, что нужно. Наверное, вы просто сообщите им, что искомое найдено (быть может, крикнув «Есть!»), чтобы они могли прекратить поиски и заняться чем-нибудь другим. Но многие алгоритмы по природе своей должны обработать каждый элемент и, стало быть, не имеют эквивалента восклицанию «Есть!». Для алгоритмов типа

std::find
умение «досрочно» прекращать работу — важное свойство, которое нельзя игнорировать. И, следовательно, его нужно учитывать при проектировании кода — закладывать возможность прерывания других задач, когда ответ уже известен, чтобы программа не ждала, пока прочие рабочие потоки обработают оставшиеся элементы.

Без прерывания других потоков последовательная версия может работать быстрее параллельной, потому что прекратит поиск, как только будет найден нужный элемент. Если, например, система поддерживает четыре параллельных потока, то каждый из них должен будет просмотреть четверть полного диапазона, поэтому при наивном распараллеливании каждый поток потратит на просмотр своих элементов четверть всего времени. Если искомый элемент окажется в первой четверти диапазона, то последовательный алгоритм завершится раньше, так как не должен будет просматривать оставшиеся элементы.

Один из способов прервать другие потоки — воспользоваться атомарной переменной в качестве флага и проверять его после обработки каждого элемента. Если флаг поднят, то один из потоков уже нашел нужный элемент, поэтому можно прекращать обработку. При таком способе прерывания мы не обязаны обрабатывать все элементы, и, значит, параллельная версия чаще будет обгонять последовательную. Недостаток состоит в том, что загрузка атомарной переменной — медленная операция, которая тормозит работу каждого потока.

Мы уже знаем о двух способах возврата значений и распространения исключений. Можно использовать массив будущих результатов и объекты

std::packaged_task
для передачи значений и исключений, после чего обработать частичные результаты в главном потоке. Или с помощью
std::promise
устанавливать окончательный результат прямо в рабочем потоке. Все зависит от того, как мы хотим обрабатывать исключения, возникающие в рабочих потоках. Если требуется остановиться при первом возникновении исключения (несмотря на то, что обработаны не все элементы), то можно использовать
std::promise
для передачи значения и исключения. С другой стороны, если мы хотим, чтобы рабочие потоки продолжали поиск, то используем
std::packaged_task
, сохраняем все исключения, а затем повторно возбуждаем одно из них, если искомый элемент не найден.

В данном случае я остановился на

std::promise
, потому что такое поведение больше походит на поведение
std::find
. Надо только не забыть о случае, когда искомого элемента в указанном диапазоне нет. Поэтому необходимо дождаться завершения всех потоков перед тем, как получать значение из будущего результата. Если просто блокировать исполнение при обращении к
get()
, то при условии, что искомого элемента нет, мы будем ждать вечно. Получившийся код приведён ниже.


Листинг 8.9. Параллельная реализация алгоритма

find()

template

Iterator parallel_find(Iterator first, Iterator last,

 MatchType match) {

 struct find_element { ←
(1)

  void operator()(Iterator begin, Iterator end,

  MatchType match,

  std::promise* result,

  std::atomic* done_flag) {

  try {

   for(; (begin != end) && !done_flag->load(); ++begin) {←
(2)

   if (*begin == match) {

    result->set_value(begin);←
(3)

    done_flag->store(true);  ←
(4)

    return;

   }

   }

  } catch (...) { ←
(5)

   try {

   result->set_exception(std::current_exception());←
(6)

   done_flag->store(true);

   } catch (...) ←
(7)

   {}

  }

  }

 };


 unsigned long const length = std::distance(first, last);


 if (!length)

  return last;


 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread — 1) / min_per_thread;


 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();


 unsigned long const num_threads =

  std::min(

  hardware_threads != 0 ? hardware_threads : 2, max_threads);


 unsigned long const block_size = length / num_threads;


 std::promise result;   ←
(8)

 std::atomic done_flag(false);←
(9)

 std::vector threads(num_threads — 1); {←
(10)

  join_threads joiner(threads);


 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  threads[i] = std::thread(find_element(), ←
(11)

  block_start, block_end, match,

  &result, &done_flag);

  block_start = block_end;

 }

 find_element()(

  block_start, last, match, &result, &done_flag);←
(12)

 if (!done_flag.load()) { ←
(13)

  return last;

 }

 return result.get_future().get() 
(14)

}

В основе своей код в листинге 8.9 похож на предыдущие примеры. На этот раз вся работа производится в операторе вызова, определенном в локальном классе

find_element
(1). Здесь мы в цикле обходим элементы из назначенного потоку блока, проверяя флаг на каждой итерации (2). Если искомый элемент найден, то мы записываем окончательный результат в объект-обещание (3) и перед возвратом устанавливаем флаг
done_flag
(4).

Если было возбуждено исключение, то его перехватит универсальный обработчик (5) и попытается сохранить исключение в обещании (6) перед установкой

done_flag
. Но установка значения объекта-обещания может возбудить исключение, если значение уже установлено, поэтому мы перехватываем и игнорируем любые возникающие здесь исключения (7).

Это означает, что если поток, вызвавший

find_element
, найдет искомый элемент или возбудит исключение, то все остальные потоки увидят поднятый флаг
done_flag
и прекратят работу. Если несколько потоков одновременно найдут искомое или возбудят исключение, то возникнет гонка за установку результата в обещании. Но это безобидная гонка: победивший поток считается «первым», и установленный им результат приемлем.

В самой функции

parallel_find
мы определяем обещание (8) и флаг прекращения поиска (9), которые передаем новым потокам вместе с диапазоном для просмотра (11). Кроме того, главный поток пользуется классом
find_element
для поиска среди оставшихся элементов (12). Как уже отмечалось, мы должны дождаться завершения всех потоков перед тем, как проверять результат, потому что искомого элемента может вообще не оказаться. Для этого мы заключаем код запуска и присоединения потоков в блок (10), так что к моменту проверки флага (13) все потоки гарантировано присоединены. Если элемент был найден, то, обратившись к функции
get()
объекта
std::future
, мы либо получим результат из обещания, либо возбудим сохраненное исключение.

Как и раньше, в этой реализации предполагается, что мы собираемся использовать все доступные аппаратные потоки или располагаем каким-то механизмом, который позволит заранее определить количество потоков для предварительного разделения между ними работы. И снова мы можем упростить код, воспользовавшись функцией

std::async
и рекурсивным разбиением данных, если готовы принять автоматический механизм масштабирования, скрытый в стандартной библиотеке С++. Реализация
parallel_find
с применением
std::async
приведена в листинге ниже.


Листинг 8.10. Параллельная реализация алгоритма

find()
с применением
std::async

template
(1)

Iterator parallel_find_impl(Iterator first, Iterator last,

 MatchType match,

 std::atomic& done) {

 try {

  unsigned long const length = std::distance(first, last);

  unsigned long const min_per_thread = 25;      ←
(2)

  if (length < (2 * min_per_thread)) {        ←
(3)

  for(; (first != last) && !done.load(); ++first) {←
(4)

   if (*first == match) {

   done = true;                  ←
(5)

   return first;

   }

  }

  return last; ←
(6)

  } else {

  Iterator const mid_point = first + (length / 2);   ←
(7)

  std::future async_result =

   std::async(¶llel_find_impl,←
(8)

  mid_point, last, match, std::ref(done));

  Iterator const direct_result =

   parallel_find_impl(first, mid_point, match, done); ←
(9)

  return (direct_result == mid_point) ?

   async_result.get() : direct_result; ←
(10)

 }

 } catch (...) {

  done = true; ←
(11)

  throw;

 }

}


template

Iterator parallel_find(

 Iterator first, Iterator last, MatchType match) {

 std::atomic done(false);

 return parallel_find_impl(first, last, match, done); ←
(12)

}

Желание закончить поиск досрочно при обнаружении совпадения заставило нас ввести флаг, разделяемый между всеми потоками. Этот флаг, следовательно, нужно передавать во все рекурсивные вызовы. Проще всего сделать это, делегировав работу отдельной функции (1), которая принимает дополнительный параметр — ссылку на флаг

done
, передаваемый из главной точки входа (12).

Основная же ветвь кода не таит никаких неожиданностей. Как и во многих предыдущих реализациях, мы задаем минимальное количество элементов, обрабатываемых в одном потоке (2); если размер обеих половин диапазона меньше этой величины, то весь диапазон обрабатывается в текущем потоке (3). Собственно алгоритм сводится к простому циклу — он продолжается, пока не будет достигнут конец заданного диапазона или не установлен флаг

done
(4). При обнаружении совпадения мы устанавливаем флаг
done
и выходим из функции (5). Если мы дошли до конца списка или вышли из цикла, потому что другой поток установил флаг
done
, то возвращаем значение
last
, означающее, что совпадение не найдено (6).

Если диапазон можно разбивать, то мы сначала находим среднюю точку (7), а потом через

std::async
запускаем поиск во второй половине диапазона (8), не забыв передать ссылку на флаг
done
с помощью
std::ref
. Одновременно мы просматриваем первую половину диапазона, рекурсивно вызывая себя же (9). И асинхронный, и рекурсивный вызов могут разбивать диапазон и дальше, если он достаточно велик.

Если прямой рекурсивный вызов вернул

mid_point
, значит, он не нашел совпадения, поэтому нужно получить результат асинхронного поиска. Если и в той половине ничего не было найдено, то мы получим
last
(10). Если «асинхронный» вызов на самом деле был не асинхронным, а отложенным, то выполняться он начнет именно при обращении к
get()
; в таком случае поиск во второй половине списке вообще не будет производиться, если поиск в первой оказался успешным. Если же асинхронный поиск действительно выполнялся в другом потоке, то деструктор переменной
async_result
будет ждать завершения этого потока, поэтому утечки потоков не произойдет.

Как и раньше, применение

std::async
гарантирует безопасность относительно исключений и распространения исключений вверх по стеку вызовов. Если прямой рекурсивный вызов возбудит исключение, то деструктор будущего результата позаботится о том, чтобы поток, в котором работал асинхронный поиск, завершился до возврата из функции. Если исключение возбудит асинхронный вызов, то оно распространится вверх при вызове
get()
(10). Внешний блок
try/catch
нужен только для того, чтобы установить флаг
done
и обеспечить тем самым быстрое завершение всех потоков в случае исключения (11). Программа правильно работала бы и без этого, по продолжала бы сравнивать элементы до естественного завершения всех потоков.

Существенной особенностью обеих реализаций этого алгоритма (характерной и для других рассмотренных выше параллельных алгоритмов) является тот факт, что элементы могут обрабатываться не в том же порядке, что в стандартном алгоритме

std::find
. Это важный момент при распараллеливании любого алгоритма. Если порядок имеет значение, то обрабатывать элементы параллельно нельзя. В таких алгоритмах, как
parallel_for_each
, порядок обработки независимых элементов не важен, однако
parallel_find
может вернуть элемент, найденный где-то в конце диапазона, хотя имеется другой такой же элемент в начале. Это может оказаться неприятной неожиданностью.

Итак, нам удалось распараллелить

std::find
. В начале этого раздела я говорил, что существуют и другие алгоритмы, которые могут завершаться раньше, чем будут обработаны все элементы. К ним применима такая же техника. В главе 9 мы еще вернёмся к вопросу о прерывании потоков.

В последнем из трех примеров мы направимся в другую сторону и рассмотрим алгоритм

std::partial_sum
. Он не очень широко известен, но интересен с точки зрения распараллеливания, поскольку позволяет проиллюстрировать некоторые дополнительные проектные решения.

8.5.3. Параллельная реализация
std::partial_sum

Алгоритм

std::partial_sum
вычисляет частичные суммы по диапазону, то есть каждый элемент заменяется суммой всех элементов от начала диапазона до него самого включительно. Таким образом, последовательность 1, 2, 3, 4, 5 преобразуется в 1, (1+2)=3, (1+2+3)=6, (1+2+3+4)=10, (1+2+3+4+5)=15. С точки зрения распараллеливания этот алгоритм интересен тем, что невозможно разбить диапазон на части и обрабатывать каждый блок независимо. Действительно, значение первого элемента необходимо складывать с каждым из остальных элементов, так что независимой обработки не получается.

Один из возможных подходов — вычислить частичные суммы отдельных блоков, а затем прибавить полученное значение последнего элемента в первом блоке ко всем элементам в следующем блоке и так далее. Например, если исходную последовательность 1, 2, 3, 4, 5, 6, 7, 8, 9 разбить на три равных блока, то после первого прохода получатся блоки {1, 3, 6}, {4, 9, 15}, {7, 15, 24}. Если теперь прибавить 6 (значение последнего элемента в первом блоке) ко всем элементам второго блока, то получится {1, 3, 6}, {10, 15, 21}, {7, 15, 24}. Далее прибавляем последний элемент второго блока (21) ко всем элементам третьего блока и получаем окончательный результат: {1, 3, 6}, {10, 15, 21}, {28, 36, 55}.

Процедуру прибавления частичной суммы предыдущего блока ко всем элементам следующего также можно распараллелить. Если сначала изменить последний элемент блока, то оставшиеся элементы того же блока могут модифицироваться одним потоком, тогда как другой поток одновременно будет модифицировать следующий блок. И так далее. Такое решение хорошо работает, если количество элементов в списке намного превышает количество процессорных ядер, поскольку в этом случае у каждого ядра будет достаточно элементов для обработки на каждом этапе.

Если же процессорных ядер очень много (столько же, сколько элементов в списке, или больше), то описанный подход оказывается не столь эффективен. Если разбить всю работу между процессорами, то на первом шаге каждый процессор будет работать всего с двумя элементами. Но тогда на этапе распространения результатов большинство процессоров будут ждать, и хорошо бы их чем-то запять. В таком случае можно подойти к задаче по-другому. Вместо того чтобы сначала вычислять частичные суммы всех блоков, а затем распространять их от предыдущего к следующему, мы можем распространять суммы по частям. Сначала, как и раньше, вычисляем суммы соседних элементов. На следующем шаге каждое вычисленное значение прибавляется к элементу, отстоящему от него на расстояние 2. Затем новые значения прибавляются к элементам, отстоящим на расстояние 4, и так далее. Если начать с тех же самых девяти элементов, то после первого шага мы получим последовательность 1, 3, 5, 7, 9, 11, 13, 15, 17, в которой правильно вычислены первые два элемента. После второго шага получается последовательность 1, 3, 6, 10, 14, 18, 22, 26, 30, в которой правильны первые четыре элемента. После третьего мы получим последовательность 1, 3, 6, 10, 15, 21, 28, 36, 44, в которой правильны первые восемь элементов. И после четвертого шага получаем окончательный результат 1, 3, 6, 10, 15, 21, 28, 36, 45. Общее количество шагов здесь больше, чем в первом варианте, зато и возможности для распараллеливания на большое число процессоров шире — на каждом шаге каждый процессор может обновлять одно число.

Всего во втором варианте требуется выполнить log2(N) шагов по N операций на каждом шаге (по одной на процессор), где N — число элементов в списке. В первом же варианте каждый поток производит N/k операций для вычисления частичной суммы своего блока и еще N/k операций для распространения сумм (здесь k — число потоков). Следовательно, вычислительная сложность первого варианта в терминах количества операций составляет O(N), а второго — O(N log(N)). Однако, если процессоров столько же, сколько элементов в списке, то во втором варианте требуется произвести только log(N) операций на каждом процессоре, тогда как в первом при большом k операции из-за распространения частичных сумм вперед фактически сериализуются. Таким образом, если количество процессоров невелико, то первый алгоритм завершится быстрее, тогда как в массивно параллельных системах победителем окажется второй алгоритм. Это крайнее проявление феномена, обсуждавшегося в разделе 8.2.1.

Но оставим в стороне эффективность и перейдем к коду. В листинге 8.11 приведена реализация первого подхода.


Листинг 8.11. Параллельное вычисление частичных сумм путём разбиения задачи на части

template

void parallel_partial_sum(Iterator first, Iterator last) {

 typedef typename Iterator::value_type value_type;


 struct process_chunk { ←
(1)

  void operator()(Iterator begin, Iterator last,

  std::future* previous_end_value,

  std::promise* end_value) {

  try {

   Iterator end = last;

   ++end;

   std::partial_sum(begin, end, begin); ←
(2)

   if (previous_end_value) {      ←
(3)

   value_type& addend = previous_end_value->get();←
(4)

   *last += addend;        ←
(5)

   if (end_value) {

    end_value->set_value(*last); ←
(6)

   }

   std::for_each(begin, last, [addend](value_type& item) {←
(7)

    item += addend;

   });

   } else if (end_value) {

   end_value->set_value(*last); ←
(8)

   }

  } catch (...) { ←
(9)

   if (end_value) {

   end_value->set_exception(std::current_exception()); ←
(10)

   } else {

   throw; ←
(11)

   }

  }

  }

 };


 unsigned long const length = std::distance(first, last);


 if (!length)

  return last;


 unsigned long const min_per_thread = 25; ←
(12)


 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;


 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();


 unsigned long const num_threads =

  std::min(

  hardware_threads!= 0 ? hardware_threads : 2, max_threads);


 unsigned long const block_size = length / num_threads;


 typedef typename Iterator::value_type value_type;

 std::vector threads(num_threads - 1);←
(13)

 std::vector >

  end_values(num_threads - 1);           ←
(14)

 std::vector >

  previous_end_values;               ←
(15)

 previous_end_values.reserve(num_threads - 1);   ←
(16)

 join_threads joiner(threads);

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_last = block_start;

  std::advance(block_last, block_size – 1); ←
(17)

  threads[i] = std::thread(process_chunk(), ←
(18)

  block_start, block_last,

  (i !=0 ) ? &previous_end_values[i - 1] : 0, &end_values[i]);

  block_start = block_last;

  ++block_start; ←
(19)

  previous_end_values.push_back(end_values[i].get_future());←
(20)

 }

 Iterator final_element = block_start;

 std::advance(

  final_element, std::distance(block_start, last) - 1);←
(21)

  process_chunk()(block_start, final_element, ←
(22)

  (num_threads > 1) ? &previous_end_values.back() : 0, 0);

}

Общая структура этого кода не отличается от рассмотренных ранее алгоритмов: разбиение задачи на блоки с указанием минимального размера блока, обрабатываемого одним потоком (12). В данном случае, помимо вектора потоков (13), мы завели вектор обещаний (14), в котором будут храниться значения последних элементов в каждом блоке, и вектор будущих результатов (15), используемый для получения последнего значения из предыдущего блока. Так как мы знаем, сколько всего понадобится будущих результатов, то можем заранее зарезервировать для них место в векторе (16), чтобы избежать перераспределения памяти при запуске потоков.

Главный цикл выглядит так же, как раньше, только на этот раз нам нужен итератор, который указывает на последний элемент в блоке (17), а не на элемент за последним, чтобы можно было распространить последние элементы поддиапазонов. Собственно обработка производится в объекте-функции

process_chunk
, который мы рассмотрим ниже; в качестве аргументов передаются итераторы, указывающие на начало и конец блока, а также будущий результат, в котором будет храниться последнее значение из предыдущего диапазона (если таковой существует), и объект-обещание для хранения последнего значения в текущем диапазоне (18).

Запустив поток, мы можем обновить итератор, указывающий на начало блока, не забыв продвинуть его на одну позицию вперед (за последний элемент предыдущего блока) (19), и поместить будущий результат, в котором будет храниться последнее значение в текущем блоке, в вектор будущих результатов, откуда он будет извлечён на следующей итерации цикла (20).

Перед тем как обрабатывать последний блок, мы должны получить итератор, указывающий на последний элемент (21), который передается в

process_chunk
(22). Алгоритм
std::partial_sum
не возвращает значения, поэтому по завершении обработки последнего блока больше ничего делать не надо. Можно считать, что операция закончилась, когда все потоки завершили выполнение.

Теперь настало время поближе познакомиться с объектом-функцией

process_chunk
, который собственно и делает всю содержательную работу (1). Сначала вызывается
std::partial_sum
для всего блока, включая последний элемент (2), но затем нам нужно узнать, первый это блок или нет (3). Если это не первый блок, то должно быть вычислено значение
previous_end_value
для предыдущего блока, поэтому нам придется его подождать (4). Чтобы добиться максимального распараллеливания, мы затем сразу же обновляем последний элемент (5), так чтобы это значение можно было передать дальше, в следующий блок (если таковой имеется) (6). Сделав это, мы можем с помощью
std::for_each
и простой лямбда-функции (7) обновить остальные элементы диапазона.

Если

previous_end_value
не существует, то это первый блок, поэтому достаточно обновить
end_value
для следующего блока (опять же, если таковой существует, — может случиться, что блок всего один) (8).

Наконец, если какая-то операция возбуждает исключение, мы перехватываем его (9) и сохраняем в объекте-обещании (10), чтобы оно было распространено в следующий блок, когда он попытается получить последнее значение из предыдущего блока (4). В результате все исключения доходят до последнего блока, где и возбуждаются повторно (11), поскольку в этой точке мы гарантированно работаем в главном потоке.

Из-за необходимости синхронизации между потоками этот код не получится переписать под

std::async
. Каждая задача ждет результатов, которые становятся доступны по ходу выполнения других задач, поэтому все задачи должны работать параллельно.

Разобравшись с решением, основанным на распространении результатов обработки предыдущих блоков, посмотрим на второй из описанных алгоритмов вычисления частичных сумм.

Реализация прогрессивно-попарного алгоритма вычисления частичных сумм

Второй способ вычисления частичных сумм, основанный на сложении элементов, расположенных на все большем расстоянии друг от друга, работает лучше всего, когда процессоры могут выполнять операции сложения синхронно. В таком случае никакой дополнительной синхронизации не требуется, потому что все промежуточные результаты можно сразу распространить на следующий нуждающийся в них процессор. Но на практике такие системы встречаются редко — разве что в случаях, когда один процессор может выполнять одну и ту же команду над несколькими элементами данных одновременно с помощью так называемых SIMD-команд (Single-Instruction/Multiple-Data — одиночный поток команд, множественный поток данных). Поэтому мы должны проектировать программу в расчете на общий случай и производить явную синхронизацию на каждом шаге.

Сделать это можно, например, с помощью барьера — механизма синхронизации, который заставляет потоки ждать, пока указанное количество потоков достигнет барьера. Когда все потоки подошли к барьеру, они разблокируются и могут продолжать выполнение. В библиотеке C++11 Thread Library готовая реализация барьера отсутствует, так что нам придется написать свою собственную.

Представьте себе американские горки в парке аттракционов. Если желающих покататься достаточно, то смотритель не запустит состав, пока не будут заняты все места. Барьер работает точно так же: вы заранее задаете число «мест», и потоки будут ждать, пока все «места» заполнятся. Когда ожидающих потоков собирается достаточно, они все получают возможность продолжить выполнение; барьер при этом сбрасывается и начинает ждать следующую партию потоков. Часто такая конструкция встречается в цикле, где на каждой итерации у барьера собираются одни и те же потоки. Идея в том, чтобы все потоки «шли в ногу» — никто не должен забегать вперед. В рассматриваемом алгоритме такой «поток-торопыга» недопустим, потому что он мог бы модифицировать данные, которые еще используются другими потоками, или, наоборот, сам попытался бы использовать еще не модифицированные должным образом данные.

В следующем листинге приведена простая реализация барьера.


Листинг 8.12. Простой класс барьера

class barrier {

 unsigned const count;

 std::atomic spaces;

 std::atomic generation;

public:

 explicit barrier (unsigned count_) : ←
(1)

 count(count_), spaces(count), generation(0) {}


 void wait() {

  unsigned const my_generation = generation; ←
(2)

  if (!--spaces) {←
(3)

  spaces = count;←
(4)

  ++generation;  ←
(5)

  } else {

  while (generation == my_generation) ←
(6)

   std::this_thread::yield();      ←
(7)

  }

 }

};

Здесь при конструировании барьера мы указываем количество «мест» (1), которое сохраняется в переменной count. В начале количество мест у барьера spaces равно

count
. Когда очередной поток выполняет функцию
wait()
, значение
spaces
уменьшается на единицу (3). Как только оно обращается в нуль, количество мест возвращается к исходному значению
count
(4), а переменная
generation
увеличивается, что служит для других потоков сигналом к продолжению работы (5). Если число свободных мест больше нуля, то поток должен ждать. В этой реализации используется простой спинлок (6), который сравнивает текущее значение
generation
с тем, что было запомнено в начале
wait()
(2). Поскольку
generation
изменяется только после того, как все потоки подошли к барьеру (5), мы можем во время ожидания уступить процессор с помощью
yield()
(7), чтобы ожидающий поток не потреблял процессорное время.

Эта реализация действительно очень простая — в ней для ожидания используется спинлок, поэтому она не идеальна для случая, когда потоки могут ждать долго, и совсем не годится в ситуации, когда в каждый момент времени может существовать более count потоков, выполнивших

wait()
. Если требуется, чтобы и в этих случаях барьер работал правильно, то следует использовать более надежную (но и более сложную) реализацию. Кроме того, я ограничился последовательно согласованными операциями над атомарными переменными, потому что они проще для понимания, но вы, возможно, захотите ослабить ограничения на порядок доступа к памяти. Такая глобальная синхронизация дорого обходится в массивно параллельных архитектурах, так как строку кэша, содержащую состояние барьера, приходится передавать всем участвующим процессорам (см. обсуждение перебрасывания кэша в разделе 8.2.2). Поэтому нужно тщательно продумать, является ли это решение оптимальным.

Как бы то ни было, барьер — это именно то, что нам необходимо в данном случае; имеется фиксированное число потоков, которые должны синхронизироваться на каждой итерации цикла. Ну почти фиксированное. Как вы, наверное, помните, элементы, расположенные близко к началу списка, получают окончательные значения всего через пару шагов. Это означает, что либо все потоки должны крутиться в цикле, пока не будет обработан весь диапазон, либо барьер должен поддерживать выпадение потоков, уменьшая значение

count
. Я предпочитаю второй вариант, чтобы не заставлять потоки выполнять бессмысленную работу в ожидании завершения последнего шага.

Но тогда нужно сделать

count
атомарной переменной, чтобы ее можно было изменять из нескольких потоков без внешней синхронизации:

std::atomic count;

Инициализация не меняется, но при переустановке

spaces
теперь нужно явно загружать
spaces
с помощью операции
load()
:

spaces = count.load();

Больше никаких изменений в

wait()
вносить не надо, но необходима новая функция-член для уменьшения count. Назовем ее
done_waiting()
, потому что с ее помощью поток заявляет, что больше ждать не будет.

void done_waiting() {

 --count;        ←
(1)

 if (!--spaces) {    ←
(2)

  spaces = count.load();←
(3)

  ++generation;

 }

}

Прежде всего мы уменьшаем

count
(1)
, чтобы при следующей переустановке
spaces
было отражено новое, меньшее прежнего, количество ожидающих потоков. Затем уменьшаем количество свободных мест
spaces
(2). Если этого не сделать, то остальные потоки будут ждать вечно, так как при инициализации
spaces
было задано старое, большее, значение. Если текущий поток является последним в партии, то количество мест нужно переустановить и увеличить
generation
на единицу (3) — так же, как в
wait()
. Основное отличие от
wait()
заключается в том, что поток не должен ждать — он же сам объявляет, что больше ждать не будет!

Теперь мы готовы написать вторую реализацию алгоритма вычисления частичных сумм. На каждом шаге каждый поток вызывает функцию

wait()
барьера, чтобы все потоки пересекли его вместе, а, закончив работу, поток вызывает функцию
done_waiting()
, чтобы уменьшить счетчик. Если наряду с исходным диапазоном использовать второй буфер, то барьер обеспечивает необходимую синхронизацию. На каждом шаге потоки либо читают исходный диапазон и записывают новое значение в буфер, либо наоборот — читают буфер и записывают новое значение в исходный диапазон. На следующем шаге исходный диапазон и буфер меняются местами. Тем самым мы гарантируем отсутствие гонки между операциями чтения и записи в разных потоках. Перед выходом из цикла каждый поток должен позаботиться о записи окончательного значения в исходный диапазон. В листинге 8.13 все это собрано воедино.


Листинг 8.13. Параллельная реализация

partial_sum
методом попарных обновлений

struct barrier {

 std::atomic count;

 std::atomic spaces;

 std::atomic generation;


 barrier(unsigned count_) :

  count(count_), spaces(count_), generation(0) {}


 void wait() {

  unsigned const gen = generation.load();

  if (!--spaces) {

  spaces = count.load();

  ++generation;

  } else {

  while (generation.load() == gen) {

   std::this_thread::yield();

  }

  }

 }


 void done_waiting() {

  --count;

  if (!--spaces) {

  spaces = count.load();

  ++generation;

  }

 }

};


template

void parallel_partial_sum(Iterator first, Iterator last) {

 typedef typename Iterator::value_type value_type;


 struct process_element { ←
(1)

  void operator()(Iterator first, Iterator last,

  std::vector& buffer,

  unsigned i, barrier& b) {

  value_type& ith_element = *(first + i);

  bool update_source = false;


  for (unsigned step = 0, stride = 1;

    stride <= i; ++step, stride *= 2) {

   value_type const& source = (step % 2) ? ←
(2)

   buffer[i] : ith_element;

   value_type& dest = (step % 2) ?

   ith_element:buffer[i];

   value_type const& addend = (step % 2) ? ←
(3)

   buffer[i - stride] : *(first + i – stride);


   dest = source + addend; ←
(4)

   update_source = !(step % 2);

   b.wait(); ←
(5)

  }

  if (update_source) { ←
(6)

   ith_element = buffer[i];

  }

  b.done_waiting(); ←
(7)

  }

 };


 unsigned long const length = std::distance(first, last);


 if (length <= 1)

  return;


 std::vector buffer(length);

 barrier b(length);

 std::vector threads(length - 1); ←
(8)

 join_threads joiner(threads);


 Iterator block_start = first;

 for (unsigned long i = 0; i < (length - 1); ++i) {

  threads[i] = std::thread(process_element(), first, last,←
(9)

  std::ref(buffer), i, std::ref(b));

 }

 process_element()(first, last, buffer, length - 1, b);  ←
(10)

}

Общая структура кода вам, наверное, уже понятна. Имеется класс с оператором вызова (

process_element
), который выполняет содержательную работу (1) и вызывается из нескольких потоков (9), хранящихся в векторе (8), а также из главного потока (10). Важное отличие заключается в том, что теперь число потоков зависит от числа элементов в списке, а не от результата, возвращаемого функцией
std::thread::hardware_concurrency
. Я уже говорил, что эта идея не слишком удачна, если только вы не работаете на машине с массивно параллельной архитектурой, где потоки обходятся дешево. Но это неотъемлемая часть самой идеи решения. Можно было бы обойтись и меньшим числом потоков, поручив каждому обработку нескольких значений из исходного диапазона, но тогда при относительно небольшом количестве потоков этот алгоритм оказался бы менее эффективен, чем алгоритм с прямым распространением.

Так или иначе, основная работа выполняется в операторе вызове из класса

process_element
. На каждом шаге берется либо i-ый элемент из исходного диапазона, либо i-ый элемент из буфера (2) и складывается с предшествующим элементом, отстоящим от него на расстояние
stride
(3); результат сохраняется в буфере, если мы читали из исходного диапазона, и в исходном диапазоне — если мы читали из буфера (4). Перед тем как переходить к следующему шагу, мы ждем у барьера (5). Работа заканчивается, когда элемент, отстоящий на расстояние
stride
, оказывается слева от начала диапазона. В этом случае мы должно обновить элемент в исходном диапазоне, если сохраняли окончательный результат в буфере (6). Наконец, мы вызываем функцию
done_waiting()
(7), сообщая барьеру, что больше ждать не будем.

Отметим, что это решение не безопасно относительно исключений. Если один из рабочих потоков возбудит исключение в

process_element
, то приложение аварийно завершится. Решить эту проблему можно было бы, воспользовавшись
std::promise
для запоминания исключения, как в реализации
parallel_find
из листинга 8.9, или просто с помощью объекта
std::exception_ptr
, защищенного мьютексом.

Вот и подошли к концу обещанные три примера. Надеюсь, они помогли уложить в сознании соображения, высказанные в разделе 8.1, 8.2, 8.3 и 8.4, и показали, как описанные приемы воплощаются в реальном коде.

8.6. Резюме

Эта глава оказалась очень насыщенной. Мы начали с описания различных способов распределения работы между потоками, в частности предварительного распределения данных и использования нескольких потоков для формирования конвейера. Затем мы остановились на низкоуровневых аспектах производительности многопоточного кода, обратив внимание на феномен ложного разделения и проблему конкуренции за данные. Далее мы перешли к вопросу о том, как порядок доступа к данным влияет на производительность программы. После этого мы поговорили о дополнительных соображениях при проектировании параллельных программ, в частности о безопасности относительно исключений и о масштабируемости. И закончили главу рядом примеров реализации параллельных алгоритмов, на которых показали, какие проблемы могут возникать при проектировании многопоточного кода.

В этой главе пару раз всплывала идея пула потоков — предварительно сконфигурированной группы потоков, выполняющих задачи, назначенные пулу. Разработка хорошего пула потоков — далеко не тривиальное дело. В следующей главе мы рассмотрим некоторые возникающие при этом проблемы, а также другие, более сложные, аспекты управления потоками.

Загрузка...