Глава 12 Многопоточная обработка

12.0. Введение

В данной главе даются рецепты написания многопоточных программ на C++ с использованием библиотеки Boost Threads, автором которой является Вильям Кемпф (William Kempf). Boost — это набор переносимых, высокопроизводительных библиотек с открытым исходным кодом, неоднократно проверенным программистами, и с широким спектром сложности: от простых структур данных до сложного фреймворка синтаксического анализа. Библиотека Boost Threads обеспечивает фреймворк для многопоточной обработки. Дополнительную информацию по проекту Boost можно найти на сайте www.boost.org.

Стандартом C++ не предусматривается встроенная поддержка многопоточной обработки, поэтому нельзя написать переносимый программный код с многопоточной обработкой, подобно тому как создается переносимый код, использующий такие классы стандартной библиотеки, как

string
,
vector
,
list
и т.д. Однако в библиотеке Boost Threads пройден значительный путь к созданию стандартной, переносимой библиотеки многопоточной обработки, и использование этой библиотеки позволяет свести к минимуму головную боль, вызываемую многими обычными проблемами, связанными с многопоточной обработкой.

Все же в отличие от стандартной библиотеки и библиотек независимых разработчиков применение библиотеки многопоточной обработки нельзя свести к распаковке ее архива, включению операторов

#includ
e и написанию программного кода, не требующего особых усилий. Во всех приложениях многопоточной обработки (кроме самых простых) необходимо тщательно подойти к разработке проекта, используя проверенные шаблоны и известные тактические приемы, позволяющие избегать ошибок, неизбежно возникающих в противном случае. В типичном однопоточном приложении обычные ошибки программирования находятся легко: циклы с пропуском одного шага, разыменование нулевого или удаленного указателя, потеря точности при преобразованиях чисел с плавающей точкой и т.д. В программах с многопоточной обработкой ситуация другая. Мало того, что задача отслеживания с помощью отладчика действий нескольких потоков становится очень трудоемкой, многопоточные программы работают недетерминировано, т.е. ошибки могут проявляться только в редких или сложных ситуациях.

Именно по этой причине нельзя рассматривать данную главу как введение в многопоточное программирование. Если вы уже имели опыт многопоточного программирования, но не на C++ или без использования библиотеки Boost Threads, эта глава будет полезна для вас. Однако описание основных принципов многопоточного программирования выходит за рамки этой книги. Если до сих пор вы никогда не занимались многопоточным программированием, по-видимому, вам следует прочитать вводный материал по этой тематике, но такой материал трудно найти, потому что большинство программистов не используют потоки выполнения (хотя, возможно, их и следовало бы применить).

Большая часть документации Boost и некоторые приводимые ниже рецепты при обсуждении классов используют понятия концепции и модели. Концепция — это абстрактное описание чего-то, обычно класса и его поведения, причем не делается никаких предположений относительно реализации. Как правило, сюда входит описание действий при конструировании и уничтожении, а также описание каждого метода с указанием предусловий, параметров и постусловий. Например, концепция мьютекса (Mutex) описывается как нечто допускающее блокирование и разблокирование только одним потоком в данный момент времени. Модель — это конкретная реализация концепции, например класс

mutex
в библиотеке Boost Threads. Уточнение (refinement) концепции — это некая ее специализация, например
ReadWriteMutex
, т.е. мьютекс с некоторыми дополнительными возможностями.

Наконец, потоки делают что-то одно из трех: работают, находятся в ожидании чего-то или готовы начать работу, но ничего не ожидают и не выполняют никаких действий. Эти состояния носят названия состояний выполнения (run), ожидания (wait) и готовности (ready). Эти термины я использую в последующих рецептах.

12.1. Создание потока

Проблема

Требуется создать поток (thread) для выполнения некоторой задачи, в то время как главный поток продолжает свою работу.

Решение

Создайте объект класса

thread
и передайте ему функтор, который выполняет данную работу. Создание объекта потока приведет к инстанцированию потока операционной системы, который начинает выполнять оператор
operator()
с вашим функтором (или начинает выполнять функцию, переданную с помощью указателя). Пример 12.1 показывает, как это делается.

Пример 12.1. Создание потока

#include 

#include 

#include 


struct MyThreadFunc {

 void operator()() {

  // Что-нибудь работающее долго...

 }

} threadFun;


int main() {

 boost::thread myThread(threadFun); // Создать поток, запускающий

                   // функцию threadFun

 boost.:thread::yield(); // Уступить порожденному потоку квант времени.

             // чтобы он мог выполнить какую-то работу.

// Выполнить какую-нибудь другую работу

myThread join(); // Текущий поток (т.е поток функции main) прежде.

         // чем завершиться, будет ждать окончания myThread

}

Обсуждение

Создается поток обманчиво просто. Вам необходимо лишь создать объект

thread
в стеке или в динамической памяти и передать ему функтор, который укажет, с чего начать работу. В данном случае термин «поток» (thread) используется в двух смыслах. Во-первых, это объект класса
thread
, который является обычным объектом C++. При ссылке на этот объект я буду говорить «объект потока». Кроме того, существует поток выполнения, который является потоком операционной системы, представленным объектом
thread
. Когда я говорю «поток» (в отличие от названия класса потока, напечатанного моноширинным шрифтом), я имею в виду поток операционной системы.

Теперь перейдем непосредственно к рассмотрению программного кода в примере. Конструктор

thread
принимает функтор (или указатель функции), имеющий два аргумента и возвращающий
void
. Рассмотрим следующую строку из примера 12.1.

boost::thread myThread(threadFun);

Она создает в стеке объект

myThread
, являющийся новым потоком операционной системы, который начинает выполнять функцию
threadFun
. В этот момент программный код функции
threadFun
и код функции
main
(по крайней мере, теоретически) выполняются параллельно. Конечно, на самом деле они могут выполняться не параллельно, поскольку ваша машина может иметь только один процессор, и в этом случае параллельная работа невозможна (благодаря недавно разработанным архитектурам процессоров это утверждение не совсем точное, но в настоящий момент я не буду принимать в расчет двухъядерные процессоры и т.п.). Если у вас только один процессор, то операционная система предоставит каждому созданному вами потоку квант времени в состоянии выполнения, перед тем как приостановить его работу. Так как эти кванты времени могут иметь различную величину, никогда нельзя с уверенностью сказать, какой из потоков раньше достигнет определенной точки. Именно в этой особенности многопоточного программирования заключается его сложность: состояние многопоточной программы недетерминировано. При выполнении несколько раз одной и той же многопоточной программы можно получить различные результаты. Темой рецепта 12.2 является координация ресурсов, используемых несколькими потоками.

После создания потока

myThread
поток
main
продолжает свою работу, по крайней мере на мгновение, пока не достигнет следующей строки.

boost::thread::yield();

Это переводит текущий поток (в данном случае поток main) в неактивное состояние, что означает переключение операционной системы на другой поток или процесс, используя некоторую политику, которая зависит от операционной системы. С помощью функции

yield
операционная система уведомляется о том, что текущий поток хочет уступить оставшуюся часть кванта времени. В это время новый поток выполняет
threadFun
. После завершения
threadFun
дочерний поток исчезает. Следует отметить, что объект
thread
не уничтожается, потому что он является объектом С++, который по-прежнему находится в области видимости. Эта особенность играет важную роль.

Объект потока — это некий объект, существующий в динамической памяти или в стеке и работающий подобно любому другому объекту С++. Когда программный код выходит из области видимости потока, все находящиеся в стеке объекты потока уничтожаются, или, с другой стороны, когда вызывающая программа выполняет оператор

delete
для
thread*
, исчезает соответствующий объект
thread
, который находится в динамической памяти. Но объекты
thread
выступают просто как прокси относительно реальных потоков операционной системы, и когда они уничтожаются, потоки операционной системы не обязательно исчезают. Они просто отсоединяются, что означает невозможность их подключения в будущем. Это не так уж плохо.

Потоки используют ресурсы, и в любом (хорошо спроектированном) многопоточном приложении управление доступом к таким ресурсам (к объектам, сокетам, файлам, «сырой» памяти и т.д.) осуществляется при помощи мьютексов, которые являются объектами, обеспечивающими последовательный доступ к каким-либо объектам со стороны нескольких потоков (см. рецепт 12.2). Если поток операционной системы оказывается «убитым», он не будет освобождать свои блокировки и свои ресурсы, подобно тому как «убитый» процесс не оставляет шансов на очистку буферов или правильное освобождение ресурсов операционной системы. Простое завершение потока в тот момент, когда вам кажется, что он должен быть завершен, — это все равно что убрать лестницу из-под маляра, когда время его работы закончилось.

Поэтому предусмотрена функция-член

join
. Как показано в примере 12.1, вы можете вызвать
join
, чтобы дождаться завершения работы дочернего потока,
join
— это вежливый способ уведомления потока, что вы собираетесь ждать завершения его работы.

myThread.join();

Поток, вызвавший функцию

join
, переходит в состояние ожидания, пока не закончит свою работу другой поток, представленный объектом
myThread
. Если он никогда не завершится, то никогда не завершится и
join
. Применение
join
— наилучший способ ожидания завершения работы дочернего потока.

Возможно, вы заметили, что, если передать что-либо осмысленное функции

threadFun
, но закомментировать
join
, поток не завершит свою работу. Вы можете убедиться в этом, выполняя в
threadFun
цикл или какую-нибудь продолжительную операцию. Это объясняется тем, что операционная система уничтожает процесс вместе со всеми его дочерними процессами независимо от того, закончили или нет они свою работу. Без вызова
join
функция
main
не будет ждать окончания работы своих дочерних потоков: она завершается, и поток операционной системы уничтожается.

Если требуется создать несколько потоков, рассмотрите возможность их группирования в объект

thread_group
. Объект
thread_group
может управлять объектами двумя способами. Во-первых, вы можете вызвать
add_thread
с указателем на объект
thread
, и этот объект будет добавлен в группу. Ниже приводится пример.

boost::thread_group grp;

boost::thread* p = new boost::thread(threadFun);

grp.add_thread(p);

// выполнить какие-нибудь действия...

grp.remove_thread(p);

При вызове деструктора

grp
он удалит оператором
delete
каждый указатель потока, который был добавлен в
add_thread
. По этой причине вы можете добавлять в
thread_group
только указатели объектов потоков, размещённых в динамической памяти. Удаляйте поток путем вызова
remove_thread
с передачей адреса объекта потока (
remove_thread
находит в группе соответствующий объект потока, сравнивая значения указателей, а не сами объекты).
remove_thread
удалит указатель, ссылающийся на этот поток группы, но вам придется все же удалить сам поток с помощью оператора
delete
.

Кроме того, вы можете добавить поток в группу, не создавая его непосредственно, а используя для этого вызов функции

create_thread
, которая (подобно объекту потока) принимает функтор в качестве аргумента и начинает его выполнение в новом потоке операционной системы. Например, для порождения двух потоков и добавления их в группу сделайте следующее.

boost::thread_group grp;

grp.create_thread(threadFun);

grp.create_thread(threadFun); // Теперь группа grp содержит два потока

grp.join_all(); // Подождать завершения всех потоков

При добавлении потоков в группу при помощи

create_thread
или
add_thread
вы можете вызвать
join_all
для ожидания завершения работы всех потоков группы. Вызов
join_all
равносилен вызову
join
для каждого потока группы:
join_all
возвращает управление после завершения работы всех потоков группы.

Создание объекта потока позволяет начать выполнение отдельного потока. Однако с помощью средств библиотеки Boost Threads это делается обманчиво легко, поэтому необходимо тщательно обдумывать проект. Прочтите остальные рецепты настоящей главы, где даются дополнительные предостережения относительно применения потоков.

Смотри также

Рецепт 12.2.

12.2. Обеспечение потокозащищенности ресурсов

Проблема

В программе используется несколько потоков и требуется гарантировать невозможность модификации ресурса несколькими потоками одновременно. В целом это называется обеспечением потокозащищенности (thread-safe) ресурсов или сериализацией доступа к ним.

Решение

Используйте класс

mutex
, определенный в boost/thread/mutex.hpp, для синхронизации доступа к потокам. Пример 12.2 показывает, как можно использовать в простых случаях объект
mutex
для управления параллельным доступом к очереди.

Пример 12.2. Создание потокозащищенного класса

#include 

#include 

#include 


// Простой класс очереди; в реальной программе вместо него следует

// использовать std::queue

template

class Queue {

public:

 Queue() {}

 ~Queue() {}

 void enqueue(const T& x) {

  // Блокировать мьютекс для этой очереди

  boost::mutex::scoped_lock lock(mutex_);

  list_.push_back(x);

  // scoped_lock автоматически уничтожается (и, следовательно, мьютекс

  // разблокируется) при выходе из области видимости

 }


 T dequeue() {

  boost::mutex::scoped_lock lock(mutex_);

  if (list_.empty())

   throw "empty!";    // Это приводит к выходу из текущей области

  T tmp = list_.front(); // видимости, поэтому блокировка освобождается

  list_.pop_front();

  return(tmp);

 } // Снова при выходе из области видимости мьютекс разблокируется


private:

 std::list list_;

 boost::mutex mutex_;

};


Queue queueOfStrings;


void sendSomething() {

 std::string s;

 for (int i = 0; i < 10; ++i) {

  queueOfStrings.enqueue("Cyrus");

 }

}


void recvSomething() {

 std::string s;

 for(int i = 0; i < 10; ++i) {

  try {

  s = queueOfStrings.dequeue();

 } catch(...) {}

 }

}


int main() {

 boost::thread thr1(sendSomething);

 boost::thread thr2(recvSomething);

 thr1.join();

 thr2.join();

}

Обсуждение

Обеспечение потокозащищенности классов, функций, блоков программного кода и других объектов является сущностью многопоточного программирования. Если вы проектируете какой-нибудь компонент программного обеспечения с возможностями многопоточной обработки, то можете постараться обеспечить каждый поток своим набором ресурсов, например объектами в стеке и динамической памяти, ресурсами операционной системы и т.д. Однако рано или поздно вам придется обеспечить совместное использование различными потоками каких-либо ресурсов. Это может быть совместная очередь поступающих запросов (как это происходит на многопоточном веб-сервере) или нечто достаточно простое, как поток вывода (например, в файл журнала или даже в

cout
). Стандартный способ координации безопасного совместного использования ресурсов подразумевает применение мьютекса (mutex), который обеспечивает монопольный доступ к чему-либо.

Остальная часть обсуждения в целом посвящена мьютексам, и в частности методам использования

boost::mutex
для сериализации доступа к ресурсам. Я использую терминологию подхода «концепция/модель», о котором я говорил кратко во введении настоящей главы. Концепция — это абстрактное (независимое от языка) описание чего-либо, а модель концепции — конкретное ее представление в форме класса С++. Уточнение концепции — это определенная концепция с некоторыми дополнительными возможностями.

Все-таки параллельное программирование представляет собой сложную тему, и в одном рецепте нельзя отразить все применяемые в этой технологии методы. Можно использовать много шаблонов проектирования и разных стратегий, подходящих для различных приложений. Если при проектировании программного обеспечения вы предполагаете, что многопоточная обработка составит значительный объем, или проектируете высокопроизводительные приложения, необходимо прочитать хорошую книгу по шаблонам многопоточной обработки. Многие проблемы, связанные с трудностями отладки многопоточных программ, могут быть успешно преодолены за счет тщательного и продолжительного проектирования.

Использование мьютексов

Концепция мьютекса проста: мьютекс это некий объект, представляющий ресурс; только один поток может его блокировать или разблокировать в данный момент времени. Он является флагом, который используется для координации доступа к ресурсу со стороны нескольких пользователей. В библиотеке Boost Threads моделью концепции мьютекса является класс

boost::mutex
. В примере 1 2.2 доступ для записи в классе Queue обеспечивается переменной-членом
mutex
.

boost::mutex mutex_;

mutex_
должен блокироваться какой-нибудь функцией-членом, которая должна изменять состояние очереди обслуживаемых элементов. Сам объект
mutex_
ничего не знает о том, что он представляет. Это просто флаг блокировки/разблокировки, используемый всеми пользователями некоторого ресурса.

В примере 12.2, когда какая-нибудь функция-член класса

Queue
собирается изменить состояние объекта, она сначала должна заблокировать
mutex_
. Только один поток в конкретный момент времени может его заблокировать, что не позволяет нескольким объектам одновременно модифицировать состояние объекта
Queue
. Таким образом, мьютекс
mutex
представляет собой простой сигнальный механизм, но это нечто большее, чем просто
bool
или
int
, потому что для mutex необходим сериализованный доступ, который может быть обеспечен только ядром операционной системы. Если вы попытаетесь сделать то же самое с
bool
, это не сработает, потому что ничто не препятствует одновременной модификации состояния
bool
несколькими потоками. (В разных операционных системах это осуществляется по-разному, и именно поэтому не просто реализовать переносимую библиотеку потоков.)

Объекты

mutex
блокируются и разблокируются, используя несколько различных стратегий блокировки, самой простой из которых является блокировка
scoped_lock
.
scoped_lock
— это класс, при конструировании объекта которого используется аргумент типа
mutex
, блокируемый до тех пор, пока не будет уничтожена блокировка
lock
. Рассмотрим функцию-член
enqueue
в примере 12.2, которая показывает, как
scoped_lock
работает совместно с мьютексом
mutex_
.

void enqueue(const T& x) {

 boost::mutex::scoped_lock lock(mutex_);

 list_.push_back(x);

} // разблокировано!

Когда

lock
уничтожается,
mutex_
разблокируется. Если
lock
конструируется для объекта
mutex
, который уже заблокирован другим потоком, текущий поток переходит в состояние ожидания до тех пор, пока
lock
не окажется доступен.

Такой подход поначалу может показаться немного странным: а почему бы мьютексу

mutex
не иметь методы
lock
и
unlock
? Применение класса
scoped_lock
, который обеспечивает блокировку при конструировании и разблокировку при уничтожении, на самом деле более удобно и менее подвержено ошибкам. Когда вы создаете блокировку, используя
scoped_lock
, мьютекс блокируется на весь период существования объекта
scoped_lock
, т.е. вам не надо ничего разблокировать в явной форме на каждой ветви вычислений. С другой стороны, если вам приходится явно разблокировать захваченный мьютекс, необходимо гарантировать перехват любых исключений, которые могут быть выброшены в вашей функции (или где-нибудь выше ее в стеке вызовов), и гарантировать разблокировку
mutex
. При использовании
scoped_lock
, если выбрасывается исключение или функция возвращает управление, объект
scoped_lock
автоматически уничтожается и
mutex
разблокируется.

Использование мьютекса позволяет сделать всю работу, однако хочется немного большего. При таком подходе нет различия между чтением и записью, что существенно, так как неэффективно заставлять потоки ждать в очереди доступа к ресурсу, когда многие из них выполняют только операции чтения, для которых не требуется монопольный доступ. Для этого в библиотеке Boost Threads предусмотрен класс

read_write_mutex
. Пример 12.3 показывает, как можно реализовать пример 12.2, используя
read_write_mutex
с функцией-членом
front
, которая позволяет вызывающей программе получить копию первого элемента очереди без его выталкивания.

Пример 12.3. Использование мьютекса чтения/записи

#include 

#include 

#include 

#include 


template

class Queue {

 public:

 Queue() : // Использовать мьютекс чтения/записи и придать ему приоритет

      // записи

 rwMutex_(boost::read_write_scheduling_policy::writer_priority) {}

 ~Queue() {}

 void enqueue(const T& x) {

  // Использовать блокировку чтения/записи, поскольку enqueue

  // обновляет состояние

  boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);

  list_.push_back(x);

 }


 T dequeue() {

  // Снова использовать блокировку для записи

  boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);

  if (list_.empty())

  throw "empty!";

  T tmp = list_.front();

  list_.pop_front();

  return(tmp);

 }


 T getFront() {

  // Это операция чтения, поэтому требуется блокировка только для чтения

 boost::read_write_mutex::scoped_read_lock.readLock(rwMutex_);

  if (list_.empty())

  throw "empty!";

  return(list_.front());

 }

private:

 std::list list_;

 boost::read_write_mutex rwMutex_;

};


Queue queueOfStrings;


void sendSomething() {

 std::string s;

 for (int i = 0, i < 10; ++i) {

  queueOfStrings.enqueue("Cyrus");

 }

}


void checkTheFront() {

 std::string s;

 for (int i=0; i < 10; ++i) {

  try {

  s = queueOfStrings.getFront();

  } catch(...) {}

 }

}


int main() {

 boost::thread thr1(sendSomething);

 boost::thread_group grp;

 grp.сreate_thread(checkTheFront);

 grp.create_thread(checkTheFront);

 grp.сreate_thread(checkTheFront);

 grp_create_thread(checkTheFront);

 thr1.join();

 grp.join_all();

}

Здесь необходимо отметить несколько моментов. Обратите внимание, что теперь я использую

read_write_mutex
.

boost::read_write_mutex rwMutex_;

При использовании мьютексов чтения/записи блокировки тоже выполняются иначе. В примере 12.3, когда мне нужно заблокировать

Queue
для записи, я создаю объект класса
scoped_write_lock
.

boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);

А когда мне просто требуется прочитать

Queue
, я использую
scoped_read_lock
.

boost::read_write_mutex::scoped_read_lock readLock(rwMutex_);

Блокировки чтения/записи удобны, но они не предохраняют вас от серьезных ошибок. На этапе компиляции не делается проверка ресурса, представленного мьютексом

rwMutex_
, гарантирующая отсутствие изменения ресурса при блокировке только для чтения. Вы сами должны позаботиться о том, чтобы поток мог модифицировать состояние объекта только при блокировке для записи, поскольку компилятор это не будет делать.

Точная последовательность выполнения блокировок определяется политикой их планирования; эту политику вы задаете при конструировании объекта mutex. В библиотеке Boost Threads предусматривается четыре политики.

reader_priority

Потоки, ожидающие выполнения блокировки для чтения, ее получат раньше потоков, ожидающих выполнения блокировки для записи.

writer_priority

Потоки, ожидающие выполнения блокировки для записи, ее получат раньше потоков, ожидающих выполнения блокировки для чтения.

alternating_single_read

Чередуются блокировки для чтения и для записи. Один читающий поток получает возможность блокировки для чтения, когда подходит «очередь» читающих потоков. Эта политика в целом отдает приоритет записывающим потокам. Например, если мьютекс заблокирован для записи и имеется несколько потоков, ожидающих блокировки для чтения, а также один поток, ожидающий блокировки для записи, сначала будет выполнена одна блокировка для чтения, затем блокировка для записи и после нее — все остальные блокировки для чтения. Подразумевается, что за это время не будет новых запросов на блокировку.

alternating_many_reads

Чередуются блокировки для чтения и для записи. Выполняются все блокировки для чтения, когда подходит «очередь» читающих потоков. Другими словами, эта политика приводит к опустошению очереди всех потоков, ожидающих блокировки для чтения, в промежутке между блокировками для записи.

Каждая из этих политик имеет свои достоинства и недостатки, и их влияние будет сказываться по-разному в различных приложениях. Необходимо тщательно подойти к выбору политики, потому что если просто обеспечить приоритет для чтения или записи, это приведет к зависанию, которое я более подробно описываю ниже.

Опасности

При программировании многопоточной обработки возникает три основные проблемы: взаимная блокировка (deadlock), зависание (starvation) и состояния состязания (race conditions — условия гонок). Существуют различные по сложности методы устранения этих проблем, но их рассмотрение выходит за рамки данного рецепта. Я дам описание каждой их этих проблем, чтобы вы знали, чего следует остерегаться, но если вы планируете разработку многопоточного приложения, вам необходимо сначала выполнить некоторое предварительную работу по шаблонам многопоточной обработки.

Взаимная блокировка связана с наличием, по крайней мере, двух потоков и двух ресурсов. Пусть имеется два потока, А и В, и два ресурса, X и Y, причем поток А блокирует ресурс X, а В блокирует Y. Взаимная блокировка возникает в том случае, когда А пытается заблокировать Y, а В пытается заблокировать X. Если при работе потоков не предусмотреть какой-либо способ устранения взаимных блокировок, они будут ждать бесконечно.

Библиотека Boost Threads позволяет избегать взаимных блокировок благодаря уточнению концепций мьютекса и блокировки. Пробный мьютекс (try mutex) — это мьютекс, который используется для определения возможности блокировки путем выполнения пробной блокировки (try lock); она может быть успешной или нет, но не блокирует ресурс, а ждет момента, когда блокировка станет возможной. Применяя модели этих концепций в форме классов

try_mutex
и
scoped_try_lock
, вы можете в своей программе идти дальше и выполнять какие-то другие действия, если доступ к требуемому ресурсу заблокирован. Существует еще одно уточнение концепции пробной блокировки, называемое временной блокировкой (timed lock). Я не рассматриваю здесь подробно временные блокировки; детальное их описание вы найдете в документации библиотеки Boost Threads.

Например, в классе

Queue
из примера 12.2 требуется использовать мьютекс для пробной блокировки с возвратом функцией
dequeue
значения типа
bool
, показывающего, может или не может быть извлечен из очереди первый элемент. В этом случае при применении функции
dequeue
не приходится ждать блокировки очереди. Ниже показано, как можно переписать функцию
dequeue
.

bool dequeue(T& x) {

 boost::try_mutex::scoped_try_lock lock(tryMutex_);

 if (!lock.locked())

  return(false);

 else {

  if (list_.empty()) throw "empty!";

  x = list_.front();

  list_.pop_front();

  return(true);

 }

}

private:

 boost::try_mutex tryMutex_;

 // ...

Используемые здесь мьютекс и блокировка отличаются от тех, которые применялись в примере 12.2. Убедитесь, что используемые вами имена классов мьютекса и блокировки правильно квалифицированы, в противном случае вы получите не то, на что рассчитываете.

При сериализации доступа к чему-либо вы заставляете пользователей этого ресурса выстраиваться друг за другом и дожидаться свой очереди. Если положение пользователей ресурса в очереди остается неизменным, каждый из них имеет шанс получения доступа к ресурсу. Однако если некоторым пользователям разрешается сокращать свою очередь, то до находящихся в конце очередь может никогда не дойти. Возникает зависание.

При использовании мьютекса mutex пользователи ресурса, которые находятся в состоянии ожидания, образуют группу, а не последовательность. Нельзя сказать, что существует определенный порядок между потоками, ожидающими возможности выполнения блокировки. Для мьютексов чтения/записи в библиотеке Boost Threads используется четыре политики планирования блокировок, которые были описаны ранее. Поэтому при использовании мьютексов чтения/записи необходимо понимать смысл различных политик планирования и действий ваших потоков. Если вы используете политику

writer_priority
и у вас много потоков, создающих блокировки для записи, ваши читающие потоки будут зависать; то же самое произойдет при применении политики
reader_priority
, поскольку эти политики планирования всегда отдают предпочтение одному из двух типов блокировки. Если в ходе тестирования вы понимаете, что один из типов потоков продвигается в очереди недостаточно, рассмотрите возможность перехода на применение политики
alternating_many_reads
или
alternating_single_read
. Тип политики задается при конструировании мьютекса чтения/записи.

Наконец, состояние состязания возникает в том случае, когда в программе делается предположение об определенном порядке выполнения блокировок или об их атомарности, что оказывается неверным. Например, рассмотрим пользователя класса

Queue
, который опрашивает первый элемент очереди и при определенном условии извлекает его из очереди с помощью функции
dequeue
.

if (q.getFront() == "Cyrus") {

 str = q.dequeue();

 // ...

Этот фрагмент программного кода хорошо работает в однопоточной среде, потому что

q
не может быть модифицирован в промежутке между первой и второй строкой. Однако в условиях многопоточной обработки, когда практически в любой момент другой поток может модифицировать
q
, следует исходить из предположения, что совместно используемые объекты модифицируются, когда поток не блокирует доступ к ним. После строки 1 другой поток, работая параллельно, может извлечь следующий элемент из
q
при помощи функции
dequeue
, что означает получение в строке 2 чего-то неожиданного или совсем ничего. Как функция
getFront
, так и функция
dequeue
блокирует один объект
mutex
, используемый для модификации
q
, но между их вызовами мьютекс разблокирован, и, если другой поток находится в ожидании выполнения блокировки, он может это сделать до того, как получит свой шанс строка 2.

Проблема состояния состязания в этом конкретном случае решается путем гарантирования сохранения блокировки на весь период выполнения операции. Создайте функцию-член

dequeueIfEquals
, которая извлекает следующий объект из очереди, если он равен аргументу. Функция
dequeueIfEquals
может использовать блокировку, как и всякая другая функция.

T dequeueIfEquals(const T& t) {

 boost::mutex::scoped_lock lock(mutex_);

 if (list_.front() == t)

 // ...

Существуют состояния состязания другого типа, но этот пример должен дать общее представление о том, чего следует остерегаться. По мере увеличения количества потоков и совместно используемых ресурсов состояния состязания оказываются более изощренными и обнаруживать их сложнее. Поэтому следует быть особенно осторожным на этапе проектирования, чтобы не допускать их.

В многопоточной обработке самое сложное — гарантировать сериализованный доступ к ресурсам, потому что если это сделано неправильно, отладка становится кошмаром. Поскольку многопоточная программа по своей сути недетерминирована (так как потоки могут выполняться в различной очередности и с различными квантами времени при каждом новом выполнении программы), очень трудно точно обнаружить место и способ ошибочной модификации чего-либо. Здесь еще в большей степени, чем в однопоточном программировании, надежный проект позволяет минимизировать затраты на отладку и переработку.

12.3. Уведомление одного потока другим

Проблема

Используется шаблон, в котором один поток (или группа потоков) выполняет какие-то действия, и требуется сделать так, чтобы об этом узнал другой поток (или группа потоков). Может использоваться главный поток, который передает работу подчиненным потокам, или может использоваться одна группа потоков для пополнения очереди и другая для удаления данных из очереди и выполнения чего-либо полезного.

Решение

Используйте объекты

mutex
и
condition
, которые объявлены в boost/thread/mutex.hpp и boost/thread/condition.hpp. Можно создать условие (
condition
) для каждой ожидаемой потоками ситуации и при возникновении такой ситуации уведомлять все ее ожидающие потоки. Пример 12.4 показывает, как можно обеспечить передачу уведомлений в модели потоков «главный/подчиненные».

Пример 12.4. Передача уведомлений между потоками

#include 

#include 

#include 

#include 

#include 

#include 


class Request { /*...*/ };


// Простой класс очереди заданий; в реальной программе вместо этого класса

// используйте std::queue

template

class JobQueue {

public:

 JobQueue() {}

 ~JobQueue() {}


 void submitJob(const T& x) {

  boost::mutex::scoped_lock lock(mutex_);

 list_.push_back(x);

 workToBeDone_.notify_one();

 }


 T getJob() {

  boost::mutex::scoped_lock lock(mutex_);

  workToBeDone_.wait(lock); // Ждать удовлетворения этого условия, затем

               // блокировать мьютекс

  T tmp = list_.front();

  list_.pop_front();

  return(tmp);

 }


private:

 std::list list_;

 boost::mutex mutex_;

 boost::condition workToBeDone_;

};


JobQueue myJobQueue;


void boss() {

 for (;;) {

  // Получить откуда-то запрос

  Request req;

  myJobQueue.submitJob(req);

 }

}


void worker() {

 for (;;) {

  Request r(myJobQueue.getJob());

  // Выполнить какие-то действия с заданием...

 }

}


int main() {

 boost::thread thr1(boss);

 boost::thread thr2(worker);

 boost::thread thr3(worker);

 thr1.join();

 thr2.join();

 thr3.join();

}

Обсуждение

Объект условия использует мьютекс

mutex
и позволяет дождаться ситуации, когда он становится заблокированным. Рассмотрим пример 12.4, в котором представлена модифицированная версии класса
Queue
из примера 12.2. Я модифицировал очередь
Queue
, получая более специализированную очередь, а именно
JobQueue
, объекты которой являются заданиями, поступающими в очередь со стороны главного потока и обрабатываемыми подчиненными потоками.

Самое важное изменение класса

JobQueue
связано переменной-членом
workToBeDone_
типа
condition
. Эта переменная показывает, имеется или нет задание в очереди. Когда потоку требуется получить элемент из очереди, он вызывает функцию
getJob
, которая пытается захватить мьютекс и затем дожидаться возникновения новой ситуации, что реализуют следующие строки.

boost::mutex::scoped_lock lock(mutex_);

workToBeDone_.wait(lock);

Первая строка блокирует мьютекс обычным образом. Вторая строка разблокирует мьютекс и переводит его в состояние ожидания или в неактивное состояние до тех пор, пока не будет удовлетворено условие. Разблокирование мьютекса позволяет другим потокам использовать этот мьютекс; один из них должен установить ожидаемое условие, в противном случае другие потоки не смогут блокировать мьютекс, пока один поток ожидает возникновения необходимого условия.

В функции

submitJob
после помещения задания во внутренний список я добавил следующую строку.

workToBeDone_.notify_one();

В результате «удовлетворяется» условие, в ожидании которого находится

getJob
. Формально это означает, что если существуют какие-нибудь потоки, вызвавшие функцию
wait
для этого условия, то один из них перейдет в состояние выполнения. Для функции
getJob
это означает продолжение работы, приостановленной в следующей строке:

workToBeDone_.wait(lock);

Но это еще не все. Функция

wait
делает две вещи: она дожидается вызова в каком-нибудь потоке функции
notify_one
или
notify_all
для данного условия, затем она пытается блокировать соответствующий мьютекс. Поэтому, когда
submitJob
вызывает
notify_all
, фактически происходит следующее: ожидающий поток переходит в состояние выполнения и на следующем шаге пытается блокировать мьютекс, который все еще блокирует функция
submitJob
, поэтому он вновь переходит в состояние ожидания, пока не завершит работу функция
submitJob
. Таким образом,
condition::wait
требует, чтобы мьютекс был блокирован при его вызове, когда он оказывается разблокированным и затем вновь заблокированным при удовлетворении условия.

Для уведомления всех потоков, ожидающих удовлетворения некоторого условия, следует вызывать функцию

notify_all
. Она работает так же,
как notify_one
, за исключением того, что в состояние выполнения переходят все потоки, ожидающие это условие. Однако теперь все они будут пытаться выполнить блокировку, поэтому характер последующих действий зависит от типа мьютекса и типа используемой блокировки.

Применение условия позволяет управлять ситуацией более тонко, чем при использовании одних только мьютексов и блокировок. Рассмотрим представленный ранее класс

Queue
. Потоки, ожидающие получение элемента из очереди, находятся в состоянии ожидания до тех пор, пока они не смогут установить блокировку для записи и затем извлечь элемент из очереди. Может показаться, что это будет хорошо работать без применения какого-либо механизма сигнализации, но так ли на самом деле? А что произойдет, когда очередь окажется пустой? У вас нет большого выбора при реализации функции
dequeue
, если вы ждете удовлетворения некоторого условия: проверка наличия элементов в очереди и, если они отсутствуют, возврат управления; использование другого мьютекса, который блокируется при пустой очереди и разблокируется, когда очередь содержит данные (не подходящее решение) или возврат специального значения, когда очередь оказывается пустой. Все это проблематично или неэффективно. Если вы просто возвращаете управление, когда очередь пустая, выбрасывая исключение или возвращая специальное значение, то вашим клиентам придется постоянно проверять поступающие значения. Это означает бесполезную трату времени.

Объект

condition
позволяет пользовательским потокам находиться в неактивном состоянии, поэтому процессор может выполнять что-то другое, когда условие не удовлетворяется. Представим веб-сервер, использующий пул рабочих потоков, обрабатывающих поступающие запросы. Значительно лучше иметь дочерние потоки, находящиеся в состоянии ожидания, когда нет никакой активности, чем заставлять их выполнять бесконечный цикл или «засыпать» и «просыпаться» периодически для проверки очереди.

12.4. Однократная инициализация совместно используемых ресурсов

Проблема

Имеется несколько потоков, использующих один ресурс, который необходимо инициализировать только один раз.

Решение

Либо инициализируйте этот ресурс до запуска потоков, либо, если первое невозможно, используйте функцию

call_once
, определенную в
, и тип
once_flag
. Пример 12.5 показывает, как можно использовать
call_once
.

Пример 12.5. Однократная инициализация

#include 

#include 

#include 


// Класс, обеспечивающий некоторое соединение, которое должно быть

// инициализировано только один раз

struct Conn {

 static void init() {++i_;}

 static boost::once_flag init_;

 static int i_;

 // ...

};


int Conn::i_ = 0;

boost::once_flag Conn::init_ = BOOST_ONCE_INIT;


void worker() {

 boost::call_once(Conn::init, Conn::init_);

 // Выполнить реальную работу...

}


Conn с; // Возможно, вы не захотите использовать глобальную переменную,

    // тогда см. следующий рецепт


int main() {

 boost::thread_group grp;

 for (int i=0; i < 100; ++i) grp.create_thread(worker);

 grp.join_all();

 std::cout << с.i_ << '\n'; // c.i = i

}

Обсуждение

Совместно используемый ресурс должен где-то инициализироваться, и, возможно, вы захотите, чтобы это сделал тот поток, который первым стал его использовать. Переменная типа

once_flag
(реальный ее тип зависит от платформы) и функция
call_once
могут предотвратить повторную инициализацию объекта. Вам придется сделать две вещи.

Во-первых, проинициализируйте вашу переменную

once_flag
с помощью макропеременной
BOOST_ONCE_INIT
. Значение этой макропеременной зависит от платформы. В примере 12.5 класс
Conn
представляет собой некоторое соединение (базы данных, сокета, оборудования и т.д.), которое я хочу инициализировать лишь однажды, несмотря на то, что несколько потоков могут пытаться сделать то же самое. Подобная ситуация возникает довольно часто, когда требуется динамически загружать библиотеку, имя которой может быть задано, например, в конфигурационном файле приложения. Флаг
once_flag
— это переменная статического класса, потому что мне требуется только однократная инициализация независимо от количества существующих экземпляров этого класса. Поэтому я следующим образом устанавливаю этот флаг в начальное значение
BOOST_ONCE_INIT
.

boost::once_flag Conn::initFlag_ = BOOST_ONCE_INIT;

Затем в моей рабочей функции я вызываю

call_once
, которая синхронизирует доступ к моему инициализированному флагу и, следовательно, предотвращает параллельное выполнение другой инициализации. Я передаю в
call_once
два аргумента:

boost::call_once(Conn::init, Conn::initFlag_);

Первым аргументом является адрес функции, которая будет выполнять инициализацию. Второй аргумент — это флаг. В данном случае несколько потоков могут попытаться выполнить инициализацию, но только первый в этом преуспеет.

12.5. Передача аргумента функции потока

Проблема

Требуется передать аргумент в вашу функцию потока, однако средствами библиотеки Boost Threads предусматривается передача только функторов без аргументов.

Решение

Создайте адаптер функтора, который принимает ваши параметры и возвращает функтор без параметров. Адаптер функтора можно использовать там, где должен был бы быть функтор потока. Пример 12.6 показывает, как это можно сделать.

Пример 12.6. Передача аргументов функции потока

#include 

#include 

#include 

#include 


// typedef используется для того, чтобы приводимые ниже объявления лучше

// читались

typedef void (*WorkerFunPtr)(const std::string&);


template

 typename ParamT>    // тип ее параметра

struct Adapter {

 Adapter(FunT f, ParamT& p) : // Сконструировать данный адаптер и

  f_(f), p_(&p) {}       // установить члены на значение функции и ее

                // аргумента

 void operator()() { // Просто вызов функции с ее аргументом

  f_(*p_);

 }

private:

 FunT f_;

 ParamT* p_; // Использовать адрес параметра. чтобы избежать лишнего

       // копирования

};


void worker(const std::string& s) {

 std::cout << s << '\n';

}


int main() {

 std::string s1 = "This is the first thread!";

 std::string s2 = "This is the second thread!";

 boost::thread thr1(Adapter(worker, s1));

 boost::thread thr2(Adapter(worker, s2));

 thr1.join();

 thr2.join();

}

Обсуждение

Здесь приходится решать принципиальную проблему, причем характерную не только для потоков или проекта Boost, а общую проблему, возникающую при необходимости передачи функтора с одной сигнатурой туда, где требуется другая сигнатура. Решение состоит в создании адаптера.

Синтаксис может показаться немного путаным, но фактически в примере 12.6 создается временный функтор, который может вызываться конструктором потока как функция без аргументов (требуется именно такая функция). Но прежде всего используйте

typedef
, чтобы указатель функции лучше воспринимался в тексте.

typedef void (*WorkerFunPtr)(const std::string&);

Это создает тип

WorkerFunPtr
, который является указателем на функцию, принимающую по ссылке аргумент типа
string
и возвращающую тип
void
. После этого я создал шаблон класса
Adapter
. Он обеспечивает инстанцирование динамического функтора. Обратите внимание на конструктор:

template

 typename ParamT>

struct Adapter {

 Adapter(FunT f, ParamT& p) : f_(f), p_(&p) {}

 // ...

Конструктор только инициализирует два члена, которые могут быть любого типа, но нам нужно, чтобы это был указатель на функцию и некоторый параметр

p
любого типа. Ради повышения эффективности я сохраняю адрес параметра, а не его значение.

Теперь рассмотрим следующую строку главного потока.

boost::thread thr1(Adapter(worker, s1))

Аргумент конструктора

thr1
представляет собой реализацию шаблона класса
Adapter
, использующую в качестве параметров два типа
WorkerFunPtr
и
std::string
. Это именно те два типа, которые являются членами адаптера
f_
и
p_
. Наконец,
Adapter
перегружает
operator()
, поэтому он может вызываться как функция. Его вызов означает просто выполнение следующей функции.

f_(*p_);

Применяя шаблон класса

Adapter
, можно передавать аргументы функциям потока, причем делается это за счет лишь небольшого усложнения синтаксиса. Если требуется передавать еще один аргумент, просто добавьте дополнительный тип и переменную-член в шаблон
Adapter
. Этот подход привлекателен тем, что позволяет создавать набор шаблонов классов обобщенного адаптера и использовать их в различных контекстах.

Загрузка...