Глава 7 Проектирование параллельных структур данных без блокировок

В этой главе:

■ Реализация параллельных структур данных без использования блокировок.

■ Техника управления памятью в структурах данных без блокировок.

■ Простые рекомендации по написанию структур данных без блокировок.

В предыдущей главе мы рассмотрели общие аспекты проектирования параллельных структур данных и сформулировали общие рекомендации, как удостовериться в том, что спроектированная структура безопасна. Затем мы изучили несколько распространенных структур данных и показали, как можно их реализовать с применением мьютексов и блокировок для защиты разделяемых данных. В первых двух примерах мы использовали один мьютекс для защиты всей структуры данных, а в последующих — несколько мьютексов, защищающих более мелкие части структуры, что обеспечило более высокий уровень параллелизма при доступе к данным.

Мьютексы — это мощный механизм, позволяющий нескольким потокам безопасно обращаться к структуре данных, не нарушая инвариантов и не порождая гонок. Рассуждать о поведении кода, в котором используются мьютексы, сравнительно просто: код либо захватывает защищающий данные мьютекс, либо нет. Но не всё коту масленица — в главе 3 мы видели, что некорректное использование мьютексов может стать причиной взаимоблокировок, а при рассмотрении очереди и справочной таблицы показали, как гранулярность блокировок может влиять на истинно параллельное выполнение программы. Если бы удалось разработать структуры данных, с которыми можно было бы безопасно работать, не прибегая к блокировкам, то эти проблемы вообще не возникали бы. Такие структуры называются структурами данных без блокировок, или свободными от блокировок.

В этой главе мы рассмотрим, как можно использовать свойства упорядочения доступа к памяти, присущие атомарным операциям (см. главу 5), для настроения структур данных без блокировок. При проектировании таких структур надо проявлять крайнюю осторожность, потому что реализовать их правильно трудно, а условия, при которых проявляются ошибки, могут возникать очень редко. Начнем с ответа на вопрос, что понимается под структурой данных без блокировок. Затем остановимся на причинах, но которым такие структуры полезны, и, наконец, проработаем ряд примеров и дадим некоторые общие рекомендации.

7.1. Определения и следствия из них

Алгоритмы и структуры данных, в которых для синхронизации доступа используются мьютексы, условные переменные и будущие результаты, называются блокирующими. Приложение вызывает библиотечные функции, которые приостанавливают выполнение потока до тех пор, пока другой поток не завершит некоторое действие. Такие библиотечные функции называются блокирующими, потому что поток не может продвинуться дальше некоторой точки, пока не будет снят блокировка. Обычно ОС полностью приостанавливает заблокированный поток (и отдает его временные кванты другому потоку) до тех пор, пока он не будет разблокирован в результате выполнения некоторого действия в другом потоке, будь то освобождение мьютекса, сигнал условной переменной или перевод будущего результата в состояние готов.

Структуры данных и алгоритмы, в которые блокирующие библиотечные функции не используются, называются неблокирующими. Но не все такие структуры данных свободны от блокировок, поэтому давайте сначала рассмотрим различные типы неблокирующих структур.

7.1.1. Типы неблокирующих структур данных

В главе 5 мы реализовали простой мьютекс-спинлок с помощью

std::atomic_flag
. Этот код воспроизведён в листинге ниже.


Листинг 7.1. Реализация мьютекса-спинлока с помощью

std::atomic_flag

class spinlock_mutex {

 std::atomic_flag flag;

public:

 spinlock_mutex():

  flag(ATOMIC_FLAG_INIT) {}


 void lock() {

  while (flag.test_and_set(std::memory_order_acquire));

 }


 void unlock() {

  flag.clear(std::memory_order_release);

 }

};

Здесь не вызываются никакие блокирующие функции;

lock()
просто «крутится» в цикле, пока
test_and_set()
не вернет
false
. Отсюда и название спинлок (spin lock) — слово spin означает «крутиться». Как бы то ни было, блокирующих вызовов нет, и, значит, любая программа, в которой для защиты разделяемых данных используется такой мьютекс, будет неблокирующей. Однако она не свободна от блокировок. Это по-прежнему мьютекс, который в каждый момент времени может захватить только один поток. Теперь сформулируем определение свободы от блокировок и посмотрим, какие структуры данных под него подпадают.

7.1.2. Структуры данных, свободные от блокировок

Чтобы структура данных считалась свободной от блокировок, она должна быть открыта для одновременного доступа со стороны сразу нескольких потоков. Не требуется, чтобы потоки могли выполнять одну и ту же операцию; свободная от блокировок очередь может позволять одного потоку помещать, а другому — извлекать данные, но запрещать одновременное добавление данных со стороны двух потоков. Более того, если один из потоков, обращающихся к структуре данных, будет приостановлен планировщиком в середине операции, то остальные должны иметь возможность завершить операцию, не дожидаясь возобновления приостановленного потока.

Алгоритмы, в которых применяются операции сравнения с обменом, часто содержат циклы. Зачем вообще используются такие операции? Затем, что другой поток может в промежутке модифицировать данные, и тогда программа должна будет повторить часть операции, прежде чем попытается еще раз выполнить сравнение с обменом. Такой код может оставаться свободным от блокировок при условии, что сравнение с обменом в конце концов завершится успешно, если другие потоки будут приостановлены. Если это не так, то мы по существу получаем спинлок, то есть алгоритм неблокирующий, но не свободный от блокировок.

Свободные от блокировок алгоритмы с такими циклами могут приводить к застреванию (starvation) потоков, когда один поток, выполняющий операции с «неудачным» хронометражем, продвигается вперёд, а другой вынужден постоянно повторять одну и ту же операцию. Структуры данных, в которых такой проблемы не возникает, называются свободными от блокировок и ожидания.

7.1.3. Структуры данных, свободные от ожидания

Свободная от блокировок структура данных называется свободной от ожидания, если обладает дополнительным свойством: каждый обращающийся к ней поток может завершить свою работу за ограниченное количество шагов вне зависимости от поведения остальных потоков. Алгоритмы, в которых количество шагов может быть неограничено из-за конфликтов с другими потоками, не свободны от ожидания.

Корректно реализовать свободные от ожидания структуры данных чрезвычайно трудно. Чтобы гарантировать, что каждый поток сможет завершить свою работу за ограниченное количество шагов, необходимо убедиться, что каждая операция может быть выполнена за один проход и что шаги, выполняемые одним потоком, не приводят к ошибке в операциях, выполняемых другими потоками. В результате алгоритмы выполнения различных операций могут значительно усложниться. Учитывая, насколько трудно правильно реализовать структуру данных, свободную от блокировок и ожидания, нужно иметь весьма веские причины для того, чтобы взяться за это дело; требуется тщательно соотносить затраты с ожидаемым выигрышем. Поэтому обсудим, какие факторы влияют на это соотношение.

7.1.4. Плюсы и минусы структур данных, свободных от блокировок

Основная причина для использования структур данных, свободных от блокировок, — достижение максимального уровня параллелизма. В контейнерах с блокировками всегда есть возможность, что один поток будет приостановлен на время, пока другой не завершит операцию, — в конце концов, основное назначение мьютекса в том и состоит, чтобы предотвратить одновременный доступ за счет взаимного исключения. В случае структуры данных, свободной от блокировок, какой-то поток продвигается вперёд на каждом шаге. Если же структура еще и свободна от ожидания, то вперёд продвигаются все потоки, вне зависимости от того, что в это время делают другие, — необходимости ждать не возникает. Это свойство весьма желательно, но труднодостижимо. На этом пути очень легко скатиться к спинлоку.

Вторая причина для использования структур данных, свободных от блокировок, — надежность. Если поток завершается, не освободив блокировку, то вся структура данных безвозвратно испорчена. Но если такое происходит с потоком во время операции над структурой данных, свободной от блокировок, то не теряется ничего, кроме данных самого потока; остальные потоки продолжают нормально работать.

Но у этой медали есть и оборотная сторона: если вы не можете запретить потокам одновременный доступ к структуре, то должны внимательно следить за соблюдением инвариантов или выбирать альтернативные инварианты, соблюдение которых можно гарантировать. Кроме того, следует обращать внимание на ограничения упорядочения, налагаемые на операции. Чтобы избежать неопределённого поведения вследствие гонки за данными, следует использовать для всех модификаций атомарные операции. Но и этого недостаточно — необходимо гарантировать, что изменения становятся видны другим потокам в правильном порядке. Все это означает, что написание потокобезопасных структур данных без использования блокировок гораздо сложнее, чем с блокировками.

Ввиду отсутствия блокировок невозможны и взаимоблокировки, однако вместо них появляется угроза активных блокировок. Активная блокировка (live lock) возникает, когда два потока одновременно пытаются изменить структуру данных, но каждый из них должен начинать свою операцию сначала из-за изменений, произведенных другим потоком. Таким образом, каждый поток беспрестанно повторяет попытки в цикле. Представьте себе двух людей, пытающихся разойтись в узком проходе. Войдя в него одновременно, они сталкиваются лбами, поэтому оба отступают назад и пробуют еще раз. И так будет повторяться до тех пор, пока кто-то не проскочит первым (по взаимному согласию, потому что оказался быстрее или просто благодаря удаче). Как и в этом простом примере, активные блокировки обычно существуют недолго, потому что зависят от случайных временных соотношений при планировании потоков. Поэтому они скорее «подъедают» производительность, чем вызывают долгосрочные проблемы, но остерегаться их все равно стоит. По определению программа, свободная от блокировок, не страдает от активных блокировок, потому что существует ограничение сверху на количество шагов, необходимых для выполнения операции. Зато и алгоритм, скорее всего, окажется сложнее альтернативного и может потребовать большего числа шагов даже в случае, когда никакой другой поток одновременно не обращается к структуре данных.

Это подводит нас к еще одному недостатку кода, свободного от блокировок и ожидания: хотя он позволяет лучше распараллелить операции над структурой данных и сократить время ожидания в каждом конкретном потоке, общая производительность программы вполне может упасть. Во-первых, атомарные операции, используемые в свободном от блокировок коде, часто выполняются гораздо медленнее, чем неатомарные, а в структуре данных без блокировок их, скорее всего, будет гораздо больше, чем в аналогичной структуре с блокировками на основе мьютексов. К тому же, оборудование должно как-то синхронизировать данные между потоками, которые обращаются к одним и тем же атомарным переменным. В главе 8 мы увидим, что эффект перебрасывания кэша, возникающий из-за того, что несколько потоков обращаются к одним и тем же атомарным переменным, может привести к существенному падению производительности. Как обычно, необходимо тщательно анализировать аспекты, связанные с производительностью (время ожидания в худшем случае, среднее время ожидания, полное время выполнения и т.д.), для обоих решений — с блокировками и без, — прежде чем остановиться на каком-то одном.

А теперь перейдём к примерам.

7.2. Примеры структур данных, свободных от блокировок

Для демонстрации некоторых приёмов проектирования структур данных, свободных от блокировок, мы рассмотрим реализации ряда простых структур.

Как уже отмечалось, структуры данных, свободные от блокировок, опираются на использование атомарных операций и связанные с ними гарантии упорядочения доступа к памяти, благодаря которым можно быть уверенным, что изменения данных становятся видны потокам в правильном порядке. Сначала мы будем использовать во всех атомарных операциях принимаемое по умолчанию упорядочение

memory_order_seq_cst
, потому что оно проще всего для понимания (напомним, что семантика
memory_order_seq_cst
устанавливает полное упорядочение всех операций). Но позже мы посмотрим, как ослабить некоторые ограничения с помощью семантик
memory_order_acquire
,
memory_order_release
и даже
memory_order_relaxed
. Хотя ни в одном примере мьютексы не используются напрямую, не стоит забывать, что отсутствие блокировок гарантируется только для типа
std::atomic_flag
. На некоторых платформах в казалось бы свободном от блокировок коде могут использоваться внутренние блокировки, скрытые в реализации стандартной библиотеки С++ (детали см. в главе 5). В этом случае простая структура данных с блокировками может оказаться предпочтительнее, но дело не только в этом; прежде чем выбирать ту или иную реализацию, нужно четко сформулировать требования, а затем подвергнуть профилированию различные решения, удовлетворяющие этим требованиям.

Итак, снова начнем с простейшей структуры данных — стека.

7.2.1. Потокобезопасный стек без блокировок

Основное свойство стека понятно: элементы извлекаются в порядке, обратном тому, в котором помещались — последним пришёл, первым ушел (LIFO). Поэтому важно убедиться, что после добавления значения в стек оно может быть сразу же безопасно извлечено другим потоком и что только один поток получает данное значение. Простейшая реализация стека основана на связанном списке; указатель

head
направлен на первый узел (который будет извлечен следующим), и каждый узел указывает на следующий в списке. При такой схеме добавление узла реализуется просто.

1. Создать новый узел.

2. Записать в его указатель

next
текущее значение
head
.

3. Записать в

head
указатель на новый узел.

Все это прекрасно работает в однопоточной программе, но, когда стек могут модифицировать сразу несколько потоков, этого недостаточно. Существенно, что если узлы добавляют два потока, то между шагами 2 и 3 возможна гонка: второй поток может модифицировать значение

head
после того, как первый прочитает его на шаге 2, но до изменения на шаге 3. В таком случае изменения, произведенные вторым потоком, будут отброшены или случится еще что-нибудь похуже. Прежде чем решать эту проблему, следует отметить, что после того, как указатель
head
будет изменен и станет указывать на новый узел, этот узел может быть прочитан другим потоком. Поэтому крайне важно, чтобы новый узел был аккуратно подготовлен до того, как на него начнет указывать
head
; потом изменять узел уже нельзя.

Ну хорошо, а как все-таки быть с этим неприятным состоянием гонки? Ответ таков — использовать атомарную операцию сравнить-и-обменять на шаге 3, гарантирующую, что

head
не был модифицирован с момента чтения на шаге 2. Если был, то следует вернуться в начало цикла и повторить. В листинге ниже показано, как можно реализовать потокобезопасную функцию
push()
без блокировок.


Листинг 7.2. Реализация функции

push()
без блокировок

template

class lock_free_stack {

private:

 struct node {

  T data;

  node* next;

  node(T const& data_) : ←
(1)

  data(data_) {}

 };


 std::atomic head;


public:

 void push(T const& data) {

  node* const new_node = new node(data);←
(2)

  new_node->next = head.load();     ←
(3)

  while (!head.compare_exchange_weak(

      new_node->next, new_node));  ←
(4)

 }

};

В этом коде дотошно реализованы все три пункта изложенного выше плана: создать новый узел (2), записать в его поле

next
текущее значение
head
(3) и записать в
head
указатель на новый узел (4). Заполнив данные самой структуры
node
в конструкторе (1), мы гарантируем, что узел готов к использованию сразу после конструирования, так что легкая проблема решена. Затем мы вызываем функцию
compare_exchange_weak()
, которая проверяет, что указатель
head
по-прежнему содержит то значение, которое было сохранено в
new_node->next
(3), и, если это так, то записывает его в
new_node
. В этой части программы используется также полезное свойство сравнения с обменом: если функция возвращает
false
, означающее, что сравнение не прошло (например, потому что значение
head
было изменено другим потоком), то в переменную, которая передана в первом параметре (
new_node->next
) записывается текущее значение
head
. Поэтому нам не нужно перезагружать
head
на каждой итерации цикла — это сделает за нас компилятор. Кроме того, поскольку мы сразу переходим в начало цикла в случае неудачного сравнения, можно использовать функцию
compare_exchange_weak
, которая в некоторых архитектурах дает более оптимальный код, чем
compare_exchange_strong
(см. главу 5).

Итак, операции

pop()
у нас пока еще нет, но уже можно сверить реализацию
push()
с рекомендациями. Единственное место, где возможны исключения, — конструирование нового узла (1), но здесь все будет подчищено автоматически, и, поскольку список еще не модифицирован, то опасности нет. Поскольку мы сами строим данные, сохраняемые в узле
node
, и используем
compare_exchange_weak()
для обновления указателя
head
, то проблематичных состояний гонки здесь нет. Если операция сравнения с обменом завершилась успешно, то узел находится в списке, и его можно извлекать. Так как нет никаких блокировок, то нет и возможности взаимоблокировки, и, стало быть, функция
push()
успешно сдала экзамен.

Теперь, когда у нас есть средства для добавления данных в стек, надо научиться их извлекать обратно. На первый взгляд, тут всё просто.

1. Прочитать текущее значение

head
.

2. Прочитать

head->next
.

3. Записать в

head
значение
head->next
.

4. Вернуть поле

data
, хранящееся в извлеченном узле
node
.

5. Удалить извлеченный узел.

Однако наличие нескольких потоков осложняет дело. Если два потока пытаются удалить элементы из стека, то оба могут прочитать одно и то же значение

head
на шаге 1. Если затем один поток успеет выполнить все операции вплоть до шага 5, прежде чем другой доберется до шага 2, то второй поток попробует разыменовать висячий указатель. Это одна из самых серьезных проблем при написании кода, свободного от блокировок, поэтому пока мы просто опустим шаг 5, смирившись с утечкой узлов.

Однако на этом трудности не кончаются. Есть еще одна проблема: если два потока прочитают одно и то же значение

head
, то они вернут один и тот же узел. Это вступает в противоречие с самой идеей стека, поэтому должно быть предотвращено любой ценой. Решить проблему можно так же, как мы устранили гонку в
push()
: использовать для обновления
head
операцию сравнения с обменом. Если она завершается с ошибкой, значит, либо в промежутке был добавлен новый узел, либо другой поток только что извлек узел, который собирались извлечь мы. В любом случае нужно вернуться на шаг 1 (хотя операция сравнения с обменом автоматически перечитывает
head
).

Если сравнение с обменом завершилось успешно, то мы точно знаем, что больше ни один поток не пытался удалить данный узел из стека, поэтому можем без опаски выполнить шаг 4. Вот первая попытка написать код

pop()
:

template

class lock_free_stack {

public:

 void pop(T& result) {

  node* old_head = head.load();

  while (!head.compare_exchange_weak(old_head, old_head->next));

  result = old_head->data;

 }

};

Вроде бы всё красиво и лаконично, но, помимо утечки узлов, осталось еще две проблемы. Во-первых, этот код не работает для пустого списка: если указатель

head
нулевой, то при попытке прочитать
next
мы получим неопределённое поведение. Это легко исправить, сравнивая в цикле
while
значение
head
с
nullptr
: если стек оказался пуст, мы можем либо возбудить исключение, либо вернуть булевский индикатор успеха или ошибки.

Вторая проблема касается безопасности относительно исключений. Впервые подступаясь к потокобезопасному стеку в главе 3, мы видели, что простой возврат объекта по значению небезопасен относительно исключений: если исключение возникает во время копирования возвращаемого значения, то значение будет потеряно. Тогда передача ссылки на результат оказалась приемлемым решением, которое гарантировало неизменность стека в случае исключения. К сожалению, сейчас мы лишены такой роскоши; безопасно скопировать данные можно только тогда, когда мы точно знаем, что больше никакой поток не пытается вернуть данный узел, а это означает, что узел уже удален из стека. Следовательно, передача возвращаемого значения по ссылке больше не является преимуществом, с тем же успехом можно было бы вернуть его и по значению. Чтобы безопасно вернуть значение, придется воспользоваться другим вариантом, описанным в главе 3: возвращать интеллектуальный указатель на данные.

Возврат

nullptr
в качестве значения интеллектуального указателя будет означать, что данных в стеке нет, но беда в том, что теперь приходится выделять память из кучи. Если делать это в
pop()
, то получится, что мы ровным счетом ничего не выиграли, потому что выделение памяти может возбудить исключение. Вместо этого мы будем выделять память в
push()
, при помещении данных в стек — память-то для структуры
node
выделять приходится в любом случае. Возврат
std::shared_ptr<>
не возбуждает исключений, поэтому
pop()
теперь безопасна. Собрав все вместе, мы получим код, показанный в следующем листинге.


Листинг 7.3. Свободный от блокировок стек с утечкой узлов

template

class lock_free_stack {

private:

 struct node       
(1) Теперь данные

 {             │
удерживаются

  std::shared_ptr data;←┘
указателем

  node* next;


  node(T const& data_) :      
(2) Создаем std::shared_ptr

  data(std::make_shared(data))←┤
Для только что выде-

  {}                │
ленного T

 };


 std::atomic head;


public:

 void push(T const& data) {

  node* const new_node = new node(data);

  new_node->next = head.load();

  while (!head.compare_exchange_weak(new_node->next, new_node));

 }


 std::shared_ptr pop()

 {
(3) Перед разыменованием

  node* old_head = head.load();│
проверяем, что old_head

  while (old_head &&      ←┘
ненулевой указатель

  !head.compare_exchange_weak(old_head, old_head->next));

  return old_head ? old_head->data : std::shared_ptr();←
(4)

 }

};

Теперь данные удерживаются указателем (1), поэтому мы должны выделять память для них из кучи в конструкторе узле (2). Кроме того, перед тем как разыменовывать

old_head
в цикле
compare_exchange_weak()
(3), следует проверять указатель на нуль. Наконец, мы либо возвращаем ассоциированные с узлом данные, если узел имеется, либо нулевой указатель, если его нет (4). Отметим, что этот алгоритм свободен от блокировок, но не свободен от ожидания, потому что цикл
while
в
push()
и
pop()
теоретически может выполняться бесконечно, если
compare_exchange_weak()
будет каждый раз возвращать
false
.

Если бы у нас был сборщик мусора (как в управляемых языках типа С# или Java), то на этом можно было бы ставить точку — старый узел был бы убран и повторно использован после того, как все потоки перестанут к нему обращаться. Но сегодня мало найдётся компиляторов С++ с встроенным сборщиком мусора, поэтому прибираться за собой нужно самостоятельно.

7.2.2. Устранение утечек: управление памятью в структурах данных без блокировок

При первом подходе к

pop()
мы решили смириться с утечкой узлов, чтобы избежать гонки в случае, когда один поток удаляет узел, а другой в это время хранит указатель на него, который собирается разыменовать. Однако в корректной программе на С++ утечка памяти, конечно, недопустима, так что с этим надо что-то делать. Сейчас мы покажем, как эта проблема решается.

Основная трудность состоит в том, что мы хотим освободить занятую узлом память, но не можем сделать это, пока нет уверенности, что никакой другой поток не хранит указателей на нее. Если бы в каждый момент времени только один поток вызывал

pop()
для данного экземпляра стека, то все было бы прекрасно. Функция
push()
не обращается к уже добавленному в стек узлу, так что кроме потока, вызвавшего
pop()
, этот узел больше никого не интересует, и его можно безопасно удалить.

С другой стороны, если мы все-таки хотим, чтобы несколько потоков могли одновременно вызывать

pop()
, то нужно каким-то образом отслеживать момент, когда удаление узла становится безопасным. По сути дела, это означает, что необходимо написать специализированный сборщик мусора для узлов
node
. Звучит пугающе, и эта задача действительно не самая простая, но на практике все не так плохо: мы проверяем только указатели на
node
и только те узлы, к которым обращается
pop()
. Нас не интересуют узлы внутри
push()
, потому что они доступны только одному потоку, пока не окажутся в стеке. А вот внутри
pop()
к одному и тому же узлу могут одновременно иметь доступ несколько потоков.

Если потоков, вызывающих

pop()
, нет вообще, то можно без опаски удалить все узлы, ожидающие удаления. Поэтому, если после извлечения данных помещать узлы в список «подлежат удалению», то их можно будет удалить одним махом в момент, когда не будет потоков, вызывающих
pop()
. Но как узнать, что потоков, вызывающих
pop()
, действительно нет? Очень просто — подсчитывать их. Если увеличивать счетчик при входе и уменьшать при выходе, то удалять узлы из списка «подлежащих удалению» можно будет, когда счетчик становится равным нулю. Разумеется, сам счетчик должен быть атомарным, чтобы к нему можно было безопасно обращаться из нескольких потоков. В листинге 7.4 показала исправленная функция
pop()
, а в листинге 7.5 — вспомогательные функции, используемые в ее реализации.


Листинг 7.4. Освобождение занятой узлами памяти в момент, когда нет потоков, вызывающих

pop()

template

class lock_free_stack {

private:

 std::atomic threads_in_pop;←┐
Атомарная

 void try_reclaim(node* old_head);  
(1) переменная


public:

 std::shared_ptr pop()

 {               
(2) Увеличить счетчик

  ++threads_in_pop;      ←┤
перед тем, как что-то

  node* old_head = head.load();│
делать

  while (old_head &&

  !head.compare_exchange_weak(old_head, old_head->next));

  std::shared_ptr res;

  if (old_head)

  {             
(3) He копировать

  res.swap(old_head->data);←┤
указатель, а извлечь

  }              │
данные из узла

  try_reclaim(old_head);←┐
Освободить удаленные

  return res;      
(4) узлы, если получится

 }

};

Атомарная переменная

threads_in_pop
(1) нужна для подсчета потоков, которые в данный момент пытаются извлечь элемент из стека. Она увеличивается на единицу в начале
pop()
(2) и уменьшается на единицу внутри функции
try_reclaim()
, которая вызывается после изъятия узла из списка (4). Поскольку мы откладываем удаление самого узла, то можно с помощью
swap()
переместить из него данные (3), а не просто скопировать указатель; тогда данные будут автоматически удалены, когда в них отпадает необходимость, вместо того, чтобы занимать память только потому, что на них ссылается еще не удаленный узел. В следующем листинге показан код функции
try_reclaim()
.


Листинг 7.5. Механизм освобождения памяти на основе подсчёта ссылок

template

class lock_free_stack {

private:

 std::atomic to_be_deleted;


 static void delete_nodes(node* nodes) {

  while (nodes) {

  node* next = nodes->next;

  delete nodes;

  nodes = next;

  }

 }


 void try_reclaim(node* old_head) {


  if (threads_in_pop == 1) ←
(1)

  {     
Заявляем права на список подлежащих удалению узлов (2)

  node* nodes_to_delete = to_be_deleted.exchange(nullptr);←┘

  if (!--threads_in_pop)←┐
Я — единственный

  {            
(3) поток в pop()?

   delete_nodes(nodes_to_delete);    ←
(4)

  } else if(nodes_to_delete) {      ←
(5)

   chain_pending_nodes(nodes_to_delete);←
(6)

  }

  delete old_head; ←
(7)

  } else {

  chain_pending_node(old_head); ←
(8)

  --threads_in_pop;

  }

 }


 void chain_pending_nodes(node* nodes) {

  node* last = nodes;

  while (node* const next =

      last->next) {←┐  
По указателям

(9) next доходим до

  last = next;     │  
конца списка

  }

  chain_pending_nodes(nodes, last);

 }


 void chain_pending_nodes(node* first, node* last) {

  last->next = to_be_deleted;←
(10)

  while (

  !to_be_deleted.compare_exchange_weak(←┐  
цикл гарантиру-

  last->next, first));         ├
(11)ет, что last->next

  }                   │  
корректно


 void chain_pending_node(node* n) {

  chain_pending_nodes(n, n);←
(12)

 }

};

Если при попытке освободить занятую узлом (1) память счетчик

threads_in_pop
оказывается равен 1, то данный поток — единственный в
pop()
, и, значит, можно безопасно удалять только что исключенный из списка узел (7) и, быть может, также узлы, ожидающие удаления. Если счетчик не равен 1, то никакие узлы удалять нельзя, поэтому мы просто добавляем узел в список ожидающих (8).

Предположим, что

threads_in_pop
равно 1. Тогда нам нужно освободить ожидающие узлы, потому что если этого не сделать, то они так и будут ожидать удаления до момента уничтожения стека. Для этого мы запрашиваем монопольные права на список с помощью атомарной операции
exchange
(2), после чего уменьшаем на единицу счетчик
threads_in_pop
(3). Если в результате счетчик оказался равным нулю, значит, больше ни один поток не работает со списком ожидающих удаления узлов. По ходу дела могли появиться новые ожидающие узлы, но сейчас — когда можно безопасно очистить список — нам это безразлично. Мы просто вызываем функцию
delete_nodes
, которая обходит список и удаляет узлы (4).

Если счетчик после уменьшения не равен нулю, то освобождать узлы небезопасно, поэтому если такие узлы есть (5), то нужно поместить их в начало списка ожидающих (6). Такое может случиться, если к структуре данных одновременно обращаются несколько потоков. Другие потоки могли вызвать

pop()
в промежутке между первой проверкой
threads_in_pop
(1) и «заявлением прав» на список (2) и добавить в список узлы, к которым все еще обращается один или несколько потоков. На рис. 7.1 поток С добавляет узел Y в список
to_be_deleted
, несмотря на то, что поток В все еще ссылается на него по указателю
old_head
и, значит, будет пробовать читать его указатель
next
. Поэтому поток А не может удалить узлы без риска вызвать неопределенное поведение в потоке В.

Рис. 7.1. На примере трех потоков, вызывающих

pop()
, видно, почему необходимо проверять
threads_in_pop
после заявления прав на список узлов, ожидающих удаления, в
try_reclaim()

Чтобы поместить узлы, ожидающие удаления, в список ожидающих, мы используем уже имеющийся указатель

next
для связывания. Для того чтобы добавить цепочку в список, мы проходим до конца цепочки (9), записываем в указатель
next
в последнем узле текущее значение
to_be_deleted
(10) и сохраняем указатель на первый узел цепочки как новый указатель
to_be_deleted
(11). Здесь необходимо вызывать
compare_exchange_weak
в цикле, чтобы не допустить утечки узлов, добавленных другим потоком. В результате в
next
записывается указатель на последний узел цепочки, если он изменился. Добавление единственного узла в список — это особый случай, когда первый узел в добавляемой цепочке совпадает с последним (12).

Этот алгоритм работает вполне приемлемо, если нагрузка невелика, то есть существуют моменты затишья, когда в

pop()
нет ни одного потока. Но эта ситуация кратковременна; именно поэтому мы должны проверять, что счетчик
threads_in_pop
после уменьшения обратился в нуль (3), прежде чем освобождать память, и по той же причине проверка стоит до удаления только что изъятого из стека узла (7). Удаление узла может занять относительно много времени, а мы хотим, чтобы окно, в котором другие потоки могут модифицировать список, было как можно короче. Чем больше времени проходит между моментом, когда поток впервые обнаружил, что
threads_in_pop
равно 1, и попыткой удалить узлы, тем больше шансов, что какой-то другой поток вызовет
pop()
, после чего
threads_in_pop
перестанет быть равно 1 и, стало быть, удалять узлы станет нельзя.

Если нагрузка высока, то затишье может не наступить никогда, поскольку новые потоки входят в

pop()
до того, как пребывавшие там успевают выйти. В таком случае список
to_be_deleted
будет расти неограниченно, и мы снова сталкиваемся с утечкой памяти. Если периодов затишья не ожидается, то необходим другой механизм освобождения узлов. Главное здесь — определить, когда ни один поток больше не обращается к конкретному узлу, который, следовательно, можно освободить. Из всех возможных механизмов такого рода для понимания проще всего тот, в котором используются указатели опасности (hazard pointers).

7.2.3. Обнаружение узлов, не подлежащих освобождению, с помощью указателей опасности

Термин указатели опасности откосится к технике, предложенной Магедом Майклом (Maged Michael)[12]. Они называются так потому, что удаление узла, на который все еще могут ссылаться другие потоки, — опасное предприятие. Если действительно существует поток, ссылающийся на данный узел, и этот поток попытается обратиться к нему по ссылке, то мы получим неопределенное поведение. Основная идея заключается в том, что поток, собирающийся получить доступ к объекту, который другой поток может захотеть удалить, сначала устанавливает указатель опасности, ссылающийся на этот объект, информируя тем самым другой поток, что удалять этот объект действительно опасно. После того как объект перестает быть нужным, указатель опасности очищается. Если вам доводилось наблюдать гребную регату между командами Оксфорда и Кембриджа, то вы могли видеть аналогичный механизм, применяемый в начале заезда: рулевой в лодке может поднять руку, сообщая, что экипаж еще не готов. Пока хотя бы один рулевой держит руку поднятой, судья не может дать старт заезду. После того как оба рулевых опустят руки, судья может давать старт, однако рулевой вправе снова поднять руку, если заезд еще не начался, а ситуация, на его взгляд, изменилась.

Собираясь удалить объект, поток должен сначала проверить указатели опасности в других имеющихся в системе потоках. Если ни один из указателей опасности не ссылается на данный объект, то его можно спокойно удалять. В противном случае удаление следует отложить. Периодически поток просматривает список отложенных объектов в поисках тех, которые уже можно удалить.

Высокоуровневое описание выглядит достаточно простым, но как это сделать на С++?

Прежде всего, необходимо место для хранения указателя на интересующий нас объект — сам указатель опасности. Это место должно быть видно из всех потоков, причем указатель опасности должен существовать в каждом потоке, который может получить доступ к структуре данных. Корректное и эффективное выделение такого места — непростая задача, поэтому отложим ее на потом, а пока предположим, что существует функция

get_hazard_pointer_for_current_thread()
, которая возвращает ссылку на указатель опасности. Затем нужно установить указатель опасности перед чтением указателя, который мы намерены разыменовать, — в данном случае указателя
head
на начало списка:

std::shared_ptr pop() {

 std::atomic& hp =

 get_hazard_pointer_for_current_thread();

 node* old_head = head.load();←
(1)

 node* temp;

 do {

  temp = old_head;

  hp.store(old_head);     ←
(2)

  old_head = head.load();

 } while (old_head != temp); ←
(3)

 // ...

}

Это необходимо делать в цикле

while
, чтобы узел
node
случайно не был удалён между чтением старого указателя
head
(1) и установкой указателя опасности (2). В течение этого промежутка времени ни один поток не знает, что мы собираемся обратиться к этому узлу. К счастью, если кто-то собирается удалить старый узел
head
, то сам указатель
head
должен был быть изменен, так что мы можем это проверить и не выходить из цикла, пока не будем твердо уверены, что указатель
head
по-прежнему имеет то же значение, которое было записано в указатель опасности (3). Такое использование указателей опасности опирается на тот факт, что можно безопасно использовать значение указателя даже после того, как объект, на который он указывает, уже удалён. Технически для стандартных реализаций
new
и
delete
это считается неопределенным поведением, поэтому либо убедитесь, что ваша реализация стандартной библиотеки допускает такое использование, либо реализуйте собственный распределитель.

Установив указатель опасности, мы можем продолжить выполнение

pop()
, будучи уверены, что ни один другой поток не попытается «вытащить» из-под нас узлы. Ну почти уверены: при каждом перечитывании
old_head
необходимо обновлять указатель опасности перед тем, как разыменовывать вновь прочитанное значение указателя. После того как узел извлечён из списка, мы можем очистить наш собственный указатель опасности. Если на наш узел не ссылаются другие указатели опасности, то его можно удалять; в противном случае его следует поместить в список узлов, ожидающих удаления. В листинге ниже приведен полный код функции
pop()
, реализованной по такой схеме.


Листинг 7.6. Реализация функции

pop()
с помощью указателей опасности

std::shared_ptr pop() {

 std::atomic& hp =

  get_hazard_pointer_for_current_thread();

 node* old_head = head.load();

 do {

  node* temp;
(1) Цикл, пока указатель

  do     ←┤
опасности не установлен

  {      │
на head

  temp = old_head;

  hp.store(old_head);

  old_head = head.load();

 } while (old_head != temp);

 }

 while (old_head &&

  !head.compare_exchange_strong(old_head, old_head->next))

  hp.store(nullptr);←
(2) Закончив, очищаем указатель опасности

 std::shared_ptr res;

 if (old_head) {

  res.swap(old_head->data);

  if (outstanding_hazard_pointers_for(old_head))←┐
Прежде чем

  {                        ├
(3) удалять узел,

  reclaim_later(old_head);           │
проверяем,
(4)

  }                        │
нет ли ссы-

  else                      │
лающихся на

  {                        │
него указате-

лей опасности

  delete old_head; ←
(5)

  }

  delete_nodes_with_no_hazards();←
(6)

 }

 return res;

}

Начнём с того, что мы перенесли цикл, в котором устанавливается указатель опасности, во внешний цикл, где перечитывается

old_head
, если операция сравнения с обменом завершается неудачно (1). Здесь мы используем функцию
compare_exchange_strong()
, потому что фактическая работа делается внутри цикла
while
: ложный отказ в
compare_exchange_weak()
привел бы к ненужному сбросу указателя опасности. Таким образом, гарантируется, что указатель опасности установлен перед разыменованием
old_head
. Заявив свои права на узел, мы можем очистить указатель опасности (2). Получив узел в свое распоряжение, мы должны проверить, не ссылаются ли на него указатели опасности, принадлежащие другим потокам (3). Если это так, то удалять узел пока нельзя, а нужно поместить его в список ожидающих (4); в противном случае узел можно удалять немедленно (5). Наконец, мы добавили вызов функции, в которой проверяется, существуют ли узлы, для которых мы ранее вызывали
reclaim_later()
. Если не осталось указателей опасности, ссылающихся на эти узлы, то мы можем спокойно удалить их (6). Те же узлы, на которые еще ссылается хотя бы один указатель опасности, остаются в списке и будут проверены следующим потоком, вызвавшим
pop()
.

Разумеется, в новых функциях —

get_hazard_pointer_for_current_thread()
,
reclaim_later()
,
outstanding_hazard_pointers_for()
и
delete_nodes_with_no_hazards()
— скрыта масса деталей, поэтому отдёрнем занавес и посмотрим, как они работают.

Как именно в функции

get_hazard_pointer_for_current_thread()
выделяется память для принадлежащих потокам указателей опасности, несущественно для логики программы (хотя, как будет показано ниже, может влиять на эффективность). Поэтому пока ограничимся простой структурой: массивом фиксированного размера, в котором хранятся пары (идентификатор потока, указатель). Функция
get_hazard_pointer_for_current_thread()
ищет в этом массиве первую свободную позицию и записывает в поле ID идентификатор текущего потока. Когда поток завершается, эта позиция освобождается — в поле ID заносится сконструированное по умолчанию значение
std::thread::id()
. Этот алгоритм показан в следующем листинге.


Листинг 7.7. Простая реализация функции

get_hazard_pointer_for_current_thread

unsigned const max_hazard_pointers = 100;

struct hazard_pointer {

 std::atomic id;

 std::atomic pointer;

};


hazard_pointer hazard_pointers[max_hazard_pointers];


class hp_owner {

 hazard_pointer* hp;

public:

 hp_owner(hp_owner const&) = delete;

 hp_owner operator=(hp_owner const&) = delete;

 hp_owner() :

  hp(nullptr) {

 for (unsigned i = 0; i < max_hazard_pointers; ++i) {

  std::thread::id old_id;

  if (

   hazard_pointers[i].

   id.compare_exchange_strong(     ←┐

   old_id, std::this_thread::get_id()))│
Пытаемся заявить

  {                   │
права на владе-

  hp = &hazard_pointers[i];      │
ние указателем

   break;                │
опасности

  }

  }

  if (!hp) {←
(1)

  throw std::runtime_error("No hazard pointers available");

  }

 }


 std::atomic& get_pointer() {

  return hp->pointer;

 }


 ~hp_owner() {←
(2)

  hp->pointer.store(nullptr);

  hp->id.store(std::thread::id());

 }

};


std::atomic& get_hazard_pointer_for_current_thread()←
(3)

{                   
(4) У каждого потока

 thread_local static hp_owner hazard;←┘
свой указатель опасности

 return hazard.get_pointer();←
(5)

}

Реализация самой функции

get_hazard_pointer_for_current_thread()
обманчиво проста (3): в ней объявлена переменная типа
hp_owner
в поточно-локальной памяти (4), в которой хранится принадлежащий данному потоку указатель опасности. Затем она просто возвращает полученный от этого объекта указатель (5). Работает это следующим образом: в первый раз, когда каждый поток вызывает эту функцию, создается новый экземпляр
hp_owner
. Его конструктор (1) ищет в таблице пар (владелец, указатель) незанятую запись (такую, у которой нет владельца). На каждой итерации цикла он с помощью
compare_exchange_strong()
атомарно выполняет два действия: проверяет, что у текущей записи нет владельца, и делает владельцем себя (2). Если
compare_exchange_strong()
возвращает
false
, значит, записью владеет другой поток, поэтому мы идем дальше. Если же функция вернула
true
, то мы успешно зарезервировали запись для текущего потока, поэтому можем сохранить ее адрес и прекратить поиск (3). Если мы дошли до конца списка и не обнаружили свободной записи (4), значит, потоков, использующих указатель опасности, слишком много, так что приходится возбуждать исключение.

После того как экземпляр

hp_owner
для данного потока создан, последующие обращения происходят гораздо быстрее, потому что указатель запомнен и просматривать таблицу снова нет нужды.

Когда завершается поток, для которого был создан объект

hp_owner
, этот объект уничтожается. Прежде чем сохранить в идентификаторе владельца значение
std::thread::id()
, деструктор записывает в сам указатель значение
nullptr
, чтобы другие потоки могли повторно использовать эту запись. При такой реализации
get_hazard_pointer_for_current_thread()
реализация функции
outstanding_hazard_pointers_for()
совсем проста: требуется только найти переданное значение в таблице указателей опасности:

bool outstanding_hazard_pointers_for(void* p) {

 for (unsigned i = 0; i < max_hazard_pointers; ++i) {

  if (hazard_pointers[i].pointer.load() == p) {

  return true;

  }

 }

 return false;

}

He нужно даже проверять, есть ли у записи владелец, так как в бесхозных записях все равно хранятся нулевые указатели, поэтому сравнение заведомо вернёт

false
; это еще упрощает код. Теперь функции
reclaim_later()
и
delete_nodes_with_no_hazards()
могут работать с простым связанным списком;
reclaim_later()
добавляет в него узлы, a
delete_nodes_with_no_hazards()
удаляет узлы, на которые не ссылаются указатели опасности. Реализация обеих функций приведена в следующем листинге.


Листинг 7.8. Простая реализация функций освобождения узлов

template

void do_delete(void* p) {

 delete static_cast(p);

}


struct data_to_reclaim {

 void* data;

 std::function deleter;

 data_to_reclaim* next;


 template

 data_to_reclaim(T* p) : ←
(1)

  data(p),

  deleter(&do_delete), next(0) {}


 ~data_to_reclaim() {

  deleter(data); ←
(2)

 }

};


std::atomic nodes_to_reclaim;


void add_to_reclaim_list(data_to_reclaim* node) {←
(3)

 node->next = nodes_to_reclaim.load();

 while (

  !nodes_to_reclaim.compare_exchange_weak(node->next, node));

}


template

void reclaim_later(T* data) {          ←
(4)

 add_to_reclaim_list(new data_to_reclaim(data));←
(5)

}


void delete_nodes_with_no_hazards() {

 data_to_reclaim* current =

 nodes_to_reclaim.exchange(nullptr);          ←
(6)

 while(current) {

  data_to_reclaim* const next = current->next;

  if (!outstanding_hazard_pointers_for(current->data)) {←
(7)

  delete current;                    ←
(8)

  } else {

  add_to_reclaim_list(current);             ←
(9)

  }

  current = next;

 }

}

Полагаю, вы обратили внимание, что

reclaim_later()
— шаблон функции, а не обычная функция (4). Объясняется это тем, что указатели опасности — это универсальный механизм, поэтому не стоит ограничивать его только узлами стека. Ранее для хранения указателей мы использовали тип
std::atomic
. Поэтому мы должны обрабатывать произвольный указательный тип, но просто указать
void*
нельзя, так как мы собираемся удалять данные по указателю, а оператору
delete
нужно знать реальный тип указателя. Как мы скоро увидим, конструктор
data_to_reclaim
прекрасно справляется с этой проблемой:
reclaim_later()
просто создает новый экземпляр
data_to_reclaim
для переданного указателя и добавляет его в список отложенного освобождения (5). Сама функция
add_to_reclaim_list()
(3) — не более чем простой цикл по
compare_exchange_weak()
для головного элемента списка; мы уже встречались с такой конструкцией раньше.

Но вернёмся к конструктору

data_to_reclaim
(1), который также является шаблоном. Он сохраняет подлежащие удалению данные в виде указателя
void*
в члене
data
, после чего запоминает указатель на подходящую конкретизацию
do_delete()
— простую функцию, которая приводит тип
void*
к типу параметризованного указателя, а затем удаляет объект, на который он указывает. Шаблон
std::function<>
безопасно обертывает этот указатель на функцию, так что впоследствии деструктору
data_to_reclaim
для удаления данных нужно всего лишь вызвать запомненную функцию (2).

Деструктор

data_to_reclaim
не вызывается, когда мы добавляем узлы в список; он вызывается только, когда на узел не ссылается ни один указатель опасности. За это отвечает функция
delete_nodes_with_no_hazards()
.

Эта функция сначала заявляет права на владение всем списком подлежащих освобождению узлов, вызывая

exchange()
(6). Это простое, но крайне важное действие гарантирует, что данный конкретный набор узлов будет освобождать только один поток. Другие потоки вправе добавлять в список новые узлы и даже пытаться освободить их, никак не затрагивая работу этого потока.

Далее мы по очереди просматриваем все узлы списка, проверяя, ссылаются ли на них не сброшенные указатели опасности (7). Если нет, то запись можно удалить (очистив хранящиеся в ней данные) (8). В противном случае мы возвращаем элемент в список, чтобы освободить его позже (9).

Хотя эта простая реализация справляется с задачей безопасного освобождения удаленных узлов, она заметно увеличивает накладные расходы. Для просмотра массива указателей опасности требуется проверить

max_hazard_pointers
атомарных переменных, и это делается при каждом вызове
pop()
. Атомарные операции по необходимости работают медленно — зачастую в 100 раз медленнее эквивалентных обычных операций на настольном ПК, — поэтому
pop()
оказывается дорогостоящей операцией. Мало того что приходится просматривать список указателей опасности для исключаемого из списка узла, так еще надо просмотреть его для каждого узла в списке ожидающих освобождения. Понятно, что это не слишком удачная идея. В списке может храниться
max_hazard_pointers
узлов, и каждый из них нужно сравнить с
max_hazard_pointers
хранимых указателей опасности. Черт! Должно существовать решение получше.

Более быстрые стратегии освобождения с применением указателей опасности

И оно, конечно же, существует. Показанное выше решение — это простая и наивная реализация указателей опасности, которую я привел, только чтобы объяснить идею. Первое, что можно сделать, — пожертвовать памятью ради быстродействия. Вместо того чтобы проверять каждый узел в списке освобождения при каждом обращении к

pop()
, мы вообще не будем пытаться освобождать узлы, пока их число в списке не превысит
max_hazard_pointers
. Тогда мы гарантированно сможем освободить хотя бы один узел. Но если просто ждать, пока в списке накопится
max_hazard_pointers+1
узлов, то выиграем мы немного. После того как число узлов достигает
max_hazard_pointers
, мы будем пытаться освобождать их почти при каждом вызове
pop()
, так что проблема лишь немного отодвинулась во времени. Но если дождаться, пока в списке наберётся
2*max_hazard_pointers
узлов, то мы гарантированно сможем освободить по крайней мере
max_hazard_pointers
узлов, и тогда следующую попытку нужно будет делать не раньше, чем через
max_hazard_pointers
обращений к
pop()
. Это уже гораздо лучше. Вместо того чтобы просматривать
max_hazard_pointers
узлов при каждом вызове
pop()
(и, возможно, ничего не освободить), мы проверяем
2*max_hazard_pointers
через каждые
max_hazard_pointers
вызовов
pop()
и освобождаем не менее
max_hazard_pointers
. Получается, что в среднем мы проверяем два узла при каждом вызове
pop()
, и один из них точно освобождается.

Но и у этого решения есть недостаток (помимо увеличенного расхода памяти): теперь мы должны подсчитывать узлы в списке освобождения, то есть использовать атомарный счетчик, а, кроме того, за доступ к самому списку конкурируют несколько потоков. Если память позволяет, то можно предложить еще более эффективную схему освобождения: каждый поток хранит собственный список освобождения в поточно-локальной памяти. Тогда ни для счетчика, ни для доступа к списку не понадобятся атомарные переменные. Но в обмен придется выделить память для

max_hazard_pointers * max_hazard_pointers
узлов. Если поток завершается прежде, чем освобождены все его узлы, то оставшиеся можно перенести в глобальный список, как и раньше, а затем добавить в локальный список следующего потока, пожелавшего выполнить процедуру освобождения.

Еще один недостаток указателей опасности состоит в том, что они защищены патент пой заявкой, поданной IBM[13]. Если вы пишете программное обеспечение, которое будет применяться в стране, где эти патенты признаются, то придется получить соответствующую лицензию. Это проблема, общая для многих методов освобождения памяти без блокировок; поскольку в этой области ведутся активные исследования, крупные компании берут патенты всюду, где могут. Возможно, вы задаетесь вопросом, зачем я посвятил так много страниц описанию техники, которой многие не смогут воспользоваться. Что ж, вопрос не праздный. Во-первых, в некоторых случаях ей можно воспользоваться, не платя лицензионных отчислений. Например, если вы разрабатываете бесплатную программу на условиях лицензии GPL[14], то она может подпадать под заявление IBM об отказе от патентных притязаний[15]. Во-вторых — и это более существенно — объяснение техники помогает высветить вещи, о которых надо помнить при написании кода, свободного от блокировок, например, о плате за атомарные операции.

А существуют ли непатентованные методы освобождения памяти, применимые в программах без блокировок? К счастью, да. Один из них — подсчет ссылок.

7.2.4. Нахождение используемых узлов с помощью подсчета ссылок

В разделе 7.2.2 мы видели, что проблема удаления узлов сводится к задаче нахождения узлов, к которым еще обращаются потоки-читатели. Если бы можно было точно узнать, на какие узлы есть ссылки и когда количество ссылок обращается в нуль, то узлы можно было бы удалить. Указатели опасности решают эту проблему путем хранения списка используемых узлов, а механизм подсчета ссылок — путем хранения числа потоков, обращающихся к каждому узлу.

Выглядит просто и элегантно, но реализовать на практике очень трудно. Сразу приходит в голову мысль, что для такой задачи подошло бы что-то вроде

std::shared_ptr<>
— ведь это и есть указатель с подсчетом ссылок. Увы, хотя некоторые операции над
std::shared_ptr<>
атомарны, не гарантируется, что они свободны от блокировок. Сам по себе класс
std::shared_ptr<>
ничем не хуже прочих с точки зрения операций над атомарными типами, но он рассчитан на применение в самых разных контекстах, и попытка сделать атомарные операции над ним свободными от блокировок, скорее всего, привела бы к увеличению накладных расходов при любом его использовании. Если на вашей платформе функция
std::atomic_is_lock_free(&some_shared_ptr)
возвращает
true
, то проблему освобождения памяти можно считать полностью решенной. Достаточно хранить в списке объекты
std::shared_ptr
, как показано в следующем листинге.


Листинг 7.9. Свободный от блокировок стек на основе свободной от блокировок реализации

std::shared_ptr<>

template

class lock_free_stack {

private:

 struct node {

  std::shared_ptr data;

  std::shared_ptr next;

  node(T const& data_) :

  data(std::make_shared(data_)) {}

 };


 std::shared_ptr head;


public:

 void push(T const& data) {

  std::shared_ptr const new_node =

  std::make_shared(data);

  new_node->next = head.load();

  while (!std::atomic_compare_exchange_weak(

  &head, &new_node->next, new_node));

 }


 std::shared_ptr pop() {

  std::shared_ptr old_head = std::atomic_load(&head);

  while(old_head && !std::atomic_compare_exchange_weak(

  &head, &old_head, old_head->next));

  return old_head ? old_head->data : std::shared_ptr();

 }

};

В том весьма вероятном случае, когда реализация

std::shared_ptr<>
не свободна от блокировок, управлять подсчетом ссылок придется самостоятельно.

В одном из возможных способов используется не один, а два счетчика ссылок — внутренний и внешний. Их сумма равна общему количеству ссылок на узел. Внешний счетчик хранится вместе с указателем на узел и увеличивается при каждом чтении этого указателя. Когда читатель закапчивает работу с узлом, он уменьшает его внутренний счетчик. Таким образом, по завершении простой операции чтения указателя внешний счетчик увеличится на единицу, а внутренний уменьшится на единицу.

Когда необходимость в связи между внешним счетчиком и указателем отпадает (то есть узел невозможно получить из области памяти, доступной другим потокам), внутренний счетчик увеличивается на величину внешнего минус 1, а внешний отбрасывается. Если внутренний счетчик обратился в нуль, значит, никаких ссылок на узел извне не осталось и его можно удалять. Для обновления разделяемых данных по-прежнему необходимо применять атомарные операции. Теперь рассмотрим реализацию свободного от блокировок стека, в которой эта техника используется для гарантии того, что узлы освобождаются, только когда это безопасно.

В листинге ниже показала внутренняя структура данных и реализация функции

push()
— простая и элегантная.


Листинг 7.10. Помещение узла в свободный от блокировок стек с разделённым счётчиком ссылок

template

class lock_free_stack {

private:

 struct node;


 struct counted_node_ptr {←
(1)

  int external_count;

  node* ptr;

 };


 struct node {

  std::shared_ptr data;←
(2)

  std::atomic internal_count;

  counted_node_ptr next;  ←
(3)

  node(T const& data_) :

  data(std::make_shared(data_)), internal_count(0) {}

 };


 std::atomic head;←
(4)


public:

 ~lock_free_stack() {

  while(pop());

 }


 void push(T const& data) {←
(5)

  counted_node_ptr new_node;

  new_node.ptr = new node(data);

  new_node.external_count = 1;

  new_node.ptr->next = head.load();

  while(

  !head.compare_exchange_weak(new_node.ptr->next, new_node));

 }

};

Обратите внимание, что внешний счетчик хранится вместе с указателем на узел в структуре

counted_node_ptr
(1). Эта структура затем используется для представления указателя
next
в структуре
node
(3), где хранится также внешний счетчик (2). Поскольку
counted_node_ptr
определена как
struct
, то ее можно использовать в шаблоне
std::atomic<>
для представления головы списка
head
(4).

На платформах, где поддерживается операция сравнения и обмена двойного слова, размер этой структуры достаточно мал для того, чтобы тип данных

std::atomic
был свободен от блокировок. Если вы работаете на другой платформе, то лучше пользоваться вариантом
std::shared_ptr<>
, приведенным в листинге 7.9, потому что в
std::atomic<>
для гарантирования атомарности используется мьютекс, когда тип слишком велик и с помощью машинных команд обеспечить атомарность невозможно (а, следовательно, ваш алгоритм, якобы «свободный от блокировок», на самом теле таковым не является). Можно поступить и по-другому — если вы готовы ограничить размер счетчика и знаете, что на данной платформе в указателе есть неиспользуемые биты (например, потому что адресное пространство представлено 48 битами, а под указатель отводится 64 бита), то счетчик можно хранить в незанятых битах указателя, и тогда оба поля поместятся в одно машинное слово. Но для таких трюков нужно хорошо знать особенности платформы, так что в этой книге мы их обсуждать не будем.

Функция

push()
относительно проста (5). Мы конструируем объект
counted_node_ptr
, ссылающийся на только что выделенный узел
node
с ассоциированными данными, и в поле
next
узла
node
записываем текущее значение
head
. Затем с помощью
compare_exchange_weak()
устанавливаем новое значение
head
, как и раньше. Внутренний счетчик
internal_count
инициализируется нулем, а внешний
external_count
— единицей. Поскольку узел новый, на него пока существует только одна внешняя ссылка (из самого указателя
head
).

Как обычно, все сложности сосредоточены в реализации

pop()
, показанной в следующем листинге.


Листинг 7.11. Выталкивание узла из свободного от блокировок стека с разделённым счётчиком ссылок

template

class lock_free_stack {

private:

 void increase_head_count(counted_node_ptr& old_counter) {

 counted_node_ptr new_counter;

  do {

  new_counter = old_counter;

  ++new_counter.external_count;

  }

  while (

  !head.compare_exchange_strong(old_counter, new_counter));←
(1)

  old_counter.external_count = new_counter.external_count;

 }


public:

 std::shared_ptr pop() {

  counted_node_ptr old_head = head.load();

  for (;;) {

  increase_head_count(old_head);

  node* const ptr = old_head.ptr;←
(2)

  if (!ptr) {

   return std::shared_ptr();

  }

  if (head.compare_exchange_strong(old_head, ptr->next)) {←
(3)

   std::shared_ptr res;

   res.swap(ptr->data);                  ←
(4)

   int const count_increase = old_head.external_count - 2;←
(5)

   if (ptr->internal_count.fetch_add(count_increase) ==

     -count_increase) {                 ←
(6)

   delete ptr;

   }

   return res; ←
(7)

  } else if (ptr->internal_count.fetch_sub(1) == 1) {

   delete ptr; ←
(8)

  }

  }

 }

};

Теперь, загрузив значение

head
, мы должны сначала увеличить счетчик внешних ссылок на узел
head
, показав, что ссылаемся на него, — только тогда его можно безопасно будет разыменовывать. Если попытаться разыменовать указатель до увеличения счетчика ссылок, то вполне может случиться так, что другой поток освободит узел раньше, чем мы успеем обратиться к нему, и, стало быть, оставит нам висячий указатель. Именно в этом главная причина использования разделенного счетчика ссылок: увеличивая внешний счетчик ссылок, мы гарантируем, что указатель останется действительным в течение всего времени работы с ним. Увеличение производится в цикле по
compare_exchange_strong()
(1), где устанавливаются все поля структуры, чтобы быть уверенным, что другой поток не изменил в промежутке указатель.

Увеличив счетчик, мы можем без опаски разыменовать поле

ptr
объекта, загруженного из
head
, и получить тем самым доступ к адресуемому узлу (2). Если оказалось, что указатель нулевой, то мы находимся в конце списка — больше записей нет. В противном случае мы можем попытаться исключить узел из списка, выполнив
compare_exchange_strong()
с головным узлом
head
(3).

Если

compare_exchange_strong()
возвращает
true
, то мы приняли на себя владение узлом и можем с помощью функции
swap()
вытащить из него данные, которые впоследствии вернём (4). Тем самым гарантируется, что данные случайно не изменятся, если вдруг другие обращающиеся к стеку другие потоки удерживают указатели на этот узел. Затем можно прибавить внешний счетчик к внутреннему с помощью атомарной операции
fetch_add
(6). Если теперь счетчик ссылок стал равен нулю, то предыдущее значение (то, которое возвращает
fetch_add
) было противоположно только что прибавленному, и тогда узел можно удалять. Важно отметить, что прибавленное значение на самом деле на 2 меньше внешнего счетчика (5); мы исключили узел из списка, вследствие чего значение счетчика уменьшилось на 1, и больше не обращаемся к узлу из данного потока, что дает уменьшение еще на 1. Неважно, удаляется узел или нет, наша работа закончена, и мы можем вернуть данные (7).

Если сравнение с обменом (3) не проходит, значит, другой поток сумел удалить узел раньше нас, либо другой поток добавил в стек новый узел. В любом случае нужно начать с начала — с новым значением

head
, которое вернула функция
compare_exchange_strong()
. Но прежде необходимо уменьшить счетчик ссылок на узел, который мы пытались исключить раньше. Этот поток больше не будет к нему обращаться. Если наш поток — последний, удерживавший ссылку на этот узел (потому что другой поток вытолкнул его из стека), то внутренний счетчик ссылок равен 1, так что после вычитания 1 он обратится в нуль. В таком случае мы можем удалить узел прямо здесь, не дожидаясь перехода к следующей итерации цикла (8).

До сих мы задавали для всех атомарных операций упорядочение доступа к памяти

std::memory_order_seq_cst
. В большинстве систем это самый неэффективный режим с точки зрения времени выполнения и накладных расходов на синхронизацию, причем в ряде случаев разница весьма ощутима. Но теперь, определившись с логикой структуры данных, можно подумать и о том, чтобы ослабить некоторые требования к упорядочению, — все-таки не хотелось бы, чтобы пользователи стека несли лишние расходы. Итак, перед тем как расстаться со стеком и перейти к проектированию свободной от блокировок очереди, еще раз присмотримся к операциям стека и спросим себя, нельзя ли где-нибудь использовать более слабое упорядочение доступа, сохранив тот же уровень безопасности?

7.2.5. Применение модели памяти к свободному от блокировок стеку

Прежде чем менять схему упорядочения, нужно исследовать все операции и определить, какие между ними должны быть отношения. Затем можно будет вернуться и найти минимальное упорядочение, которое эти отношения обеспечивает. Чтобы это сделать, потребуется взглянуть на ситуацию с точки зрения потоков в нескольких разных сценариях. Простейший сценарий возникает, когда один поток помещает элемент данных в стек, а другой через некоторое время извлекает его оттуда, с него и начнем.

В этом сценарии есть три существенных участника. Во-первых, структура

counted_node_ptr
, используемая для передачи данных — узла
head
. Во-вторых, структура
node
, на которую
head
ссылается. И, в-третьих, сами данные, на которые указывает узел.

Поток, выполняющий

push()
, сначала конструирует элемент данных и объект
node
, затем устанавливает
head
. Поток, выполняющий
pop()
, сначала загружает значение
head
, затем в цикле сравнения с обменом увеличивает хранящийся в нем счетчик ссылок, после чего читает структуру
node
, чтобы извлечь из нее значение
next
. Из этой последовательности можно вывести требуемое отношение; значение
next
— простой неатомарный объект, поэтому для его безопасного чтения должно существовать отношение происходит-раньше между операциями сохранения (в заталкивающем потоке) и загрузки (в выталкивающем потоке). Поскольку в
push()
имеется единственная атомарная операция —
compare_exchange_weak()
, а для существования отношения происходит-раньше между потоками нам нужна операция освобождения (release), то для функции
compare_exchange_weak()
необходимо задать упорядочение
std::memory_order_release
или более сильное. Если
compare_exchange_weak()
вернула
false
, то ничего не было изменено, и мы можем продолжить цикл, следовательно в этом случае нужна только семантика
std::memory_order_relaxed
:

void push(T const& data) {

 counted_node_ptr new_node;

 new_node.ptr = new node(data);

 new_node.external_count = 1;

 new_node.ptr->next = head.load(std::memory_order_relaxed);

 while (!head.compare_exchange_weak(

  new_node.ptr->next, new_node,

  std::memory_order_release, std::memory_order_relaxed));

}

А что можно сказать о коде

pop()
? Чтобы получить желаемое отношение происходит-раньше, перед доступом к
next
необходима операция с семантикой
std::memory_order_acquire
или более сильной. Указатель, который разыменовывается для доступа к полю
next
, — это прежнее значение, прочитанное операцией
compare_exchange_strong()
в
increase_head_count()
, поэтому указанная семантика нужна в случае успеха. Как и в
push()
, если обмен закончился неудачно, мы просто повторяем цикл, поэтому для отказа можно задать ослабленное упорядочение:

void increase_head_count(counted_node_ptr& old_counter) {

 counted_node_ptr new_counter;

 do {

  new_counter = old_counter;

  ++new_counter.external_count;

 }

 while (!head.compare_exchange_strong(

  old_counter, new_counter,

  std::memory_order_acquire, std::memory_order_relaxed));

 old_counter.external_count = new_counter.external_count;

}

Если вызов

compare_exchange_strong()
завершается успешно, то мы знаем, что раньше в поле
ptr
прочитанного значения находилось то, что теперь хранится в переменной
old_counter
. Поскольку сохранение в
push()
было операцией освобождения, а данный вызов
compare_exchange_strong()
— операция захвата, то сохранение синхронизируется-с загрузкой, и мы имеем отношение происходит-раньше. Следовательно, сохранение в поле
ptr
в
push()
происходит-раньше доступа к
ptr->next
в
pop()
, и мы в безопасности.

Отметим, что для этого анализа порядок доступа к памяти в начальном вызове

head.load()
не имел значения, поэтому в нем безопасно задать семантику
std::memory_order_relaxed
.

Далее на очереди операция

compare_exchange_strong()
, которая записывает в
head
значение
old_head.ptr->next
. Нужно ли наложить на нее какие-нибудь ограничения, чтобы гарантировать целостность данных в этом потоке? Если обмен завершается успешно, то мы обращаемся к
ptr->data
, поэтому должны быть уверены, что сохранение
ptr->data
в потоке, выполняющем
push()
, происходит-раньше загрузки. Но такая уверенность уже есть: операция захвата в
increase_head_count()
гарантирует, что существует отношение синхронизируется-с между сохранением в потоке, выполняющем
push()
, и операцией сравнения с обменом. Поскольку сохранение данных в потоке, выполняющем
push()
, расположено перед сохранением
head
, а вызов
increase_head_count()
расположен перед загрузкой
ptr->data
, то отношение происходит-раньше имеет место, и всё будет хорошо, даже если для операции сравнения с обменом в
pop()
задана семантика
std::memory_order_relaxed
. Есть еще всего одно место, где изменяется
ptr->data
— тот самый вызов
swap()
, на который вы сейчас смотрите, и ни один другой поток не может оперировать тем же узлом — в этом и заключается смысл сравнения с обменом.

Если

compare_exchange_strong()
завершается неудачно, то к новому значению
old_head
не будет обращений до следующей итерации цикла, и, поскольку мы уже решили, что семантики
std::memory_order_acquire
хватало в
increase_head_count()
, то здесь будет достаточно
std::memory_order_relaxed
.

Что можно сказать насчёт других потоков? Нужны ли более сильные ограничения, чтобы и другие потоки работали безопасно? Нет, не нужны, потому что

head
модифицируется только операциями сравнения с обменом. Будучи операциями чтения-модификации-записи, они составляют часть последовательности освобождений, начатой операцией сравнения с обменом в
push()
. Поэтому
compare_exchange_weak()
в
push()
синхронизируется-с операцией
compare_exchange_strong()
в
increase_head_count()
, которая прочитает сохраненное значение, даже если в промежутке другие потоки изменят
head
.

Мы почти закончили, осталось только рассмотреть функции, в которых используются операции

fetch_add()
, изменяющие счетчик ссылок. Поток, который добрался до возврата данных из узла, может продолжать в твердой уверенности, что никакой другой поток не сможет модифицировать хранящиеся в узле данные. Однако любой поток, который потерпел неудачу при извлечении данных, знает, что какой-то другой поток данные в узле модифицировал; он использовал функцию
swap()
для извлечения данных. Следовательно, чтобы предотвратить гонку за данными мы должны гарантировать, что
swap()
происходит-раньше
delete
. Чтобы добиться этого, проще всего задать семантику
std::memory_order_release
при вызове
fetch_add()
в ветви, где мы возвращаем данные, и семантику
std::memory_order_acquire
— в ветви, где мы возвращаемся в начало цикла. Однако даже это перебор — лишь один поток выполняет
delete
(тот, что сбросил счетчик в нуль), поэтому только этому потоку нужно выполнить операцию захвата. К счастью, поскольку
fetch_add()
— операция чтения-модификации-записи, то она составляет часть последовательности освобождений, поэтому для достижения цели нам достаточно дополнительной операции
load()
. Если в ветви, где происходит возврат в начало цикла, счетчик ссылок уменьшается до нуля, то здесь же можно перезагрузить счетчик ссылок с семантикой
std::memory_order_acquire
, чтобы обеспечить требуемое отношение синхронизируется-с, а в самой операции
fetch_add()
достаточно задать
std::memory_order_relaxed
. Окончательная реализация стека с новой версией
pop()
приведена ниже.


Листинг 7.12. Свободный от блокировок стек с подсчётом ссылок и ослабленными атомарными операциями

template

class lock_free_stack {

private:

 struct node;


 struct counted_node_ptr {

  int external_count;

  node* ptr;

 };


 struct node {

  std::shared_ptr data;

  std::atomic internal_count;

  counted_node_ptr next;

  node(T const& data_):

  data(std::make_shared(data_)), internal_count(0) {}

 };


 std::atomic head;


 void increase_head_count(counted_node_ptr& old_counter) {

  counted_node_ptr new_counter;


  do {

  new_counter = old_counter;

  ++new_counter.external_count;

  }

  while (!head.compare_exchange_strong(old_counter, new_counter,

     std::memory_order_acquire,

     std::memory_order_relaxed));


 old_counter.external_count = new_counter.external_count;

 }


public:

 ~lock_free_stack() {

  while(pop());

 }


 void push(T const& data) {

  counted_node_ptr new_node;

  new_node.ptr = new node(data);

  new_node.external_count = 1;

  new_node.ptr->next = head.load(std::memory_order_relaxed);

  while (!head.compare_exchange_weak(

      new_node.ptr->next, new_node,

      std::memory_order_release,

      std::memory_order_relaxed));

 }


 std::shared_ptr pop() {

  counted_node_ptr old_head =

  head.load(std::memory_order_relaxed);

  for (;;) {

  increase_head_count(old_head);

  node* const ptr = old_head.ptr;

  if (!ptr) {

   return std::shared_ptr();

  }

  if (head.compare_exchange_strong(old_head, ptr->next,

    std::memory_order_relaxed)) {

   std::shared_ptr res;

   res.swap(ptr->data);

   int const count_increase = old_head.external_count — 2;

   if (ptr->internal_count.fetch_add(count_increase,

     std::memory_order_release) == -count_increase) {

   delete ptr;

   }

   return res;

  }

  else if (ptr->internal_count.fetch_add(-1,

       std::memory_order_relaxed) == 1) {

   ptr->internal_count.load(std::memory_order_acquire);

   delete ptr;

  }

  }

 }

};

Мы немало потрудились, но наконец-то дошли до конца, и стек теперь стал куда лучше. За счет тщательно продуманного применения ослабленных операций нам удалось повысить производительность, не жертвуя корректностью. Как видите, реализация

pop()
теперь насчитывает 37 строк вместо 8 в эквивалентной реализации
pop()
для стека с блокировками (листинг 7.1) и 7 строк для простого свободного от блокировок стека без управления памятью (листинг 7.2). При рассмотрении свободной от блокировок очереди мы встретимся с аналогичной ситуацией: сложность кода в значительной степени обусловлена именно управлением памятью.

7.2.6. Потокобезопасная очередь без блокировок

Очередь отличается от стека прежде всего тем, что операции

push()
и
pop()
обращаются к разным частям структуры данных, тогда как в стеке та и другая работают с головным узлом списка. Следовательно, и проблемы синхронизации тоже другие. Требуется сделать так, чтобы изменения, произведенные на одном конце, были видны при доступе с другого конца. Однако структура функции
try_pop()
в листинге 6.6 не так уж сильно отличается от структуры
pop()
в простом свободном от блокировок стеке в листинге 7.2, поэтому можно с достаточными основаниями предположить, что и весь свободный от блокировок код будет схожим. Посмотрим, так ли это.

Если взять листинг 6.6 за основу, то нам понадобятся два указателя на

node
: один для головы списка (
head
), второй — для хвоста (
tail
). Поскольку мы собираемся обращаться к ним из нескольких потоков, то надо бы сделать эти указатели атомарными и расстаться с соответствующими мьютексами. Начнём с этого небольшого изменения и посмотрим, куда оно нас приведет. Результат показан в листинге ниже.


Листинг 7.13. Свободная от блокировок очередь с одним производителем и одним потребителем

template

class lock_free_queue {

private:

 struct node {

  std::shared_ptr data;

  node* next;

  node():

  next(nullptr) {}

 };


 std::atomic head;

 std::atomic tail;

 node* pop_head() {

  node* const old_head = head.load();

 if (old_head == tail.load()) {←
(1)

  return nullptr;

  }

 head.store(old_head->next);

  return old_head;

 }


public:

 lock_free_queue():

  head(new node), tail(head.load()) {}


 lock_free_queue(const lock_free_queue& other) = delete;

 lock_free_queue& operator=(

  const lock_free_queue& other) = delete;


 ~lock_free_queue() {

  while(node* const old_head = head.load()) {

  head.store(old_head->next);

  delete old_head;

  }

 }


 std::shared_ptr pop() {

  node* old_head = pop_head();

  if (!old_head) {

  return std::shared_ptr();

  }


  std::shared_ptr const res(old_head->data);←
(2)

  delete old_head;

  return res;

 }


 void push(T new_value) {

  std::shared_ptr new_data(std::make_shared(new_value));

  node* p = new node;         ←
(3)

  node* const old_tail = tail.load(); ←
(4)

  old_tail->data.swap(new_data);    ←
(5)

  old_tail->next = p;         ←
(6)

  tail.store(p);            ←
(7)

 }

};

На первый взгляд, неплохо, и если в каждый момент времени существует только один поток, вызывающий

push()
, и только один поток, вызывающий
pop()
, то вообще всё прекрасно. Важно отметить, что в этом случае существует отношение происходит-раньше между
push()
и
pop()
, благодаря которому извлечение данных безопасно. Сохранение
tail
(7) синхронизируется-с загрузкой
tail
(1), сохранение указателя на
data
в предыдущем узле (5) расположено перед сохранением
tail
, а загрузка
tail
расположена перед загрузкой указателя на
data
(2), поэтому сохранение
data
происходит раньше его загрузки, и всё замечательно. Таким образом, мы получили корректно обслуживаемую очередь с одним производителем и одним потребителем.

Проблемы начинаются, когда несколько потоков вызывают

push()
или
pop()
одновременно. Сначала рассмотрим
push()
. Если два потока одновременно вызывают
push()
, то оба выделяют память для нового фиктивного узла (3), оба читают одно и то же значение
tail
(4) и, следовательно, оба изменяют данные-члены
data
и
next
одного и того же узла (5), (6). А это уже гонка за данными!

Аналогичные проблемы возникают в

pop_head()
. Если два потока вызывают эту функцию одновременно, то оба читают одно и то же значение
head
, и оба перезаписывают старое значение одним и тем же указателем
next
. Оба потока теперь думают, что получили один и тот же узел, — прямой путь к катастрофе. Мы должны не только сделать так, чтобы лишь один поток извлекал данный элемент, но и позаботиться о том, чтобы другие потоки могли безопасно обращаться к члену
next
узла, который прочитали из
head
. Это точно та же проблема, с которой мы сталкивались при написании
pop()
для свободного от блокировок стека, поэтому и любое из предложенных тогда решений можно применить здесь.

Итак, проблему

pop()
можно считать решенной, но как быть с
push()
? Здесь трудность заключается в том, что для получения требуемого отношения происходит-раньше между
push()
и
pop()
мы должны заполнить поля фиктивного узла до обновления
tail
. Но это означает, что одновременные вызовы
push()
конкурируют за те же самые данные, так как был прочитал один и тот же указатель
tail
.

Решение проблемы нескольких потоков в
push()

Один из способов — добавить фиктивный узел между реальными. Тогда единственной частью текущего узла

tail
, нуждающейся в обновлении, будет указатель
next
, который, следовательно, можно было бы сделать атомарным. Если потоку удалось записать в
next
указатель на свой новый узел вместо
nullptr
, то он успешно добавил узел; в противном случае ему придется начать сначала и снова прочитать
tail
. Это потребует небольшого изменения в
pop()
— нужно будет игнорировать узлы с нулевым указателем на данные и возвращаться в начало цикла. Недостаток этого решения в том, что при каждом вызове
pop()
придется как правило исключать из списка два узла и производить в два раза больше операций выделения памяти.

Второй способ — сделать указатель

data
атомарным и устанавливать его с помощью операции сравнения с обменом. Если она завершится успешно, то мы получили свой хвостовой узел и можем безопасно записать в
next
указатель на наш новый узел, а затем обновить
tail
. Если же сравнение с обменом завершается неудачно, потому что другой поток успел сохранить данные, мы возвращаемся в начало цикла, заново читаем
tail
и пробуем снова. Если атомарные операции над s
td::shared_ptr<>
свободны от блокировок, то дело сделано. Если нет, нужна альтернатива. Можно, например, заставить
pop()
возвращать
std::unique_ptr<>
(в конце концов, это ведь единственная ссылка на объект) и сохранять данные в очереди в виде простого указателя. Тогда его можно было бы хранить как
std::atomic
и впоследствии обновлять с помощью
compare_exchange_strong()
. Если воспользоваться для поддержки нескольких потоков в
pop()
схемой подсчета ссылок из листинга 7.11, то
push()
будет выглядеть следующим образом.


Листинг 7.14. Первая (неудачная) попытка переработки

push()

void push(T new_value) {

 std::unique_ptr new_data(new T(new_value));

 counted_node_ptr new_next;

 new_next.ptr = new node;

 new_next.external_count = 1;

 for (;;) {

  node* const old_tail = tail.load();←
(1)

  T* old_data = nullptr;

  if (old_tail->data.compare_exchange_strong(

  old_data, new_data.get())) {    ←
(2)

  old_tail->next = new_next;

  tail.store(new_next.ptr);     ←
(3)

  new_data.release();

  break;

  }

 }

}

Применение схемы подсчета ссылок устраняет эту конкретную гонку, но в

push()
имеются и другие гонки. Взглянув на переработанную версию
push()
в листинге 7.14, вы обнаружите ту же ситуацию, что уже встречалась нам в стеке: загрузка атомарного указателя (1) и разыменование этого указателя (2). В промежутке между этими двумя операциями другой поток может изменить указатель (3), что в конечном итоге приведет к освобождению памяти, запятой узлом (в
pop()
). Если это произойдет раньше, чем мы разыменовываем указатель, то получится неопределенное поведение. Ой! Возникает искушение добавить в
tail
внешний счетчик, как мы уже поступили для
head
, однако на каждый узел уже имеется внешний счетчик в указателе
next
в предыдущем узле очереди. Если хранить два внешних счетчика для одного узла, то потребуется модифицировать схему подсчета ссылок, чтобы не удалить узел преждевременно. Проблему можно решить, подсчитывая число внешних счетчиков в структуре
node
и уменьшая это число при уничтожении внешнего счетчика (одновременно с прибавлением значения внешнего счетчика к значению внутреннего). Если внутренний счетчик равен нулю, а внешних не осталось, то узел можно удалять. Эту технику я впервые встретил в проекте Джо Сейга (Joe Seigh) Atomic Ptr Plus[16]. В следующем листинге показано, как выглядит
push()
при использовании такой схемы.


Листинг 7.15. Реализация

push()
для очереди без блокировок с подсчётом ссылок на
tail

template

class lock_free_queue {

private:

 struct node;


 struct counted_node_ptr {

  int external_count;

  node* ptr;

 };


 std::atomic head;

 std::atomic tail;←
(1)


 struct node_counter {

  unsigned internal_count:30;

  unsigned external_counters:2;←
(2)

 };


 struct node {

  std::atomic data;

  std::atomic count;←
(3)

 counted_node_ptr next;

  node() {

  node_counter new_count;

  new_count.internal_count = 0;

  new_count.external_counters = 2;←
(4)

  count.store(new_count);

  next.ptr = nullptr;

  next.external_count = 0;

  }

 };


public:

 void push(T new_value) {

  std::unique_ptr new_data(new T(new_value));

  counted_node_ptr new_next;

  new_next.ptr = new node;

  new_next.external_count = 1;

  counted_node_ptr old_tail = tail.load();


  for (;;) {

  increase_external_count(tail, old_tail);    ←
(5)

  T* old_data = nullptr;

  if (old_tail.ptr->data.compare_exchange_strong(←
(6)

    old_data, new_data.get())) {

   old_tail.ptr->next = new_next;

   old_tail = tail.exchange(new_next);

   free_external_counter(old_tail);       ←
(7)

   new_data.release();

   break;

  }

  old_tail.ptr->release_ref();

  }

 }

};

В листинге 7.15

tail
теперь имеет такой же тип
atomic
, как и
head
(1), а в структуру
node
добавлен член
count
вместо прежнего
internal_count
(3). Член
count
сам представляет собой структуру с двумя полями:
internal_count
и
external_counters
(2). Под поле
external_counters
отведено только 2 бита, потому что внешних счетчиков может быть не более двух. Воспользовавшись битовыми полями и отведя под
internal_count
30 бит, мы ограничили длину поля счетчика 32 битами. В результате мы убиваем сразу двух зайцев: и значение внутреннего счетчика может быть достаточно велико, и вся структура помещается в машинное слово на 32- и 64-разрядных машинах. Очень важно изменять счетчики как единое целое, чтобы избежать гонки. Как это делается, мы покажем чуть ниже. На многих платформах хранение структуры в одном машинном слове повышает шансы на то, что атомарные операции окажутся свободными от блокировок.

При инициализации структуры

node
в поле
internal_count
записывается 0, а в поле
external_counters
— 2 (4), потому что сразу после добавления нового узла в очередь на него есть две ссылки: из
tail
и из указателя
next
в предыдущем узле. Код самой функции
push()
похож на приведенный в листинге 7.14 с тем отличием, что перед тем как разыменовывать загруженное из
tail
значение, чтобы вызвать
compare_exchange_strong()
для члена узла
data
(6), мы вызываем новую функцию
increase_external_count()
которая увеличивает счетчик (5), а затем функцию
free_external_counter()
для старого хвоста
old_tail
(7).

Разобравшись с

push()
, обратим наши взоры на
pop()
. В ее коде (см. листинг 7.16) логика подсчета ссылок из реализации
pop()
в листинге 7.11 комбинируется с логикой извлечения из очереди в листинге 7.13.


Листинг 7.16. Извлечение узла из очереди без блокировок с подсчётом ссылок на

tail

template

class lock_free_queue {

private:

 struct node {

 void release_ref();

};


public:

 std::unique_ptr pop() {

  counted_node_ptr old_head =

   head.load(std::memory_order_relaxed);  ←
(1)

  for (;;) {

  increase_external_count(head, old_head);←
(2)

  node* const ptr = old_head.ptr;

  if (ptr == tail.load().ptr) {

   ptr->release_ref();←
(3)

   return std::unique_ptr();

  }

  if (head.compare_exchange_strong(old_head, ptr->next)) {←
(4)

   T* const res = ptr->data.exchange(nullptr);

   free_external_counter(old_head);←
(5)

   return std::unique_ptr(res);

  }

  ptr->release_ref();

  }

 }

};

Все начинается с загрузки значения

old_head
перед входом в цикл (1) и до увеличения внешнего счетчика в загруженном значении (2). Если узел
head
совпадает с
tail
, то можно освободить ссылку (3) и вернуть нулевой указатель, потому что очередь пуста. Если же в очереди есть данные, то мы пытаемся заявить на них свои права с помощью
compare_exchange_strong()
(4). Как и в случае стека в листинге 7.11, мы при этом сравниваем внешний счетчик и указатель как единое целое; если хотя бы один из них изменился, то мы должны вернуться в начало цикла, освободив предварительно ссылку 6. Если обмен завершился удачно, то мы получили в свое распоряжение данные в узле, поэтому можем вернуть их вызывающей программе, освободив предварительно внешний счетчик ссылок на извлеченный узел (5). После того как оба внешних счетчика освобождены, а внутренний счетчик обратился в нуль, сам узел можно удалять. Вспомогательные функции подсчета ссылок приведены в листингах 7.17, 7.18 и 7.19.


Листинг 7.17. Освобождение ссылки на узел в очереди без блокировок

template

class lock_free_queue {

private:

 struct node {

  void release_ref() {

  node_counter old_counter =

   count.load(std::memory_order_relaxed);

  node_counter new_counter;

  do {

   new_counter = old_counter;

   --new_counter.internal_count;     ←
(1)

  }

  while (!count.compare_exchange_strong(←
(2)

     old_counter, new_counter,

      std::memory_order_acquire, std::memory_order_relaxed));


  if (

   !new_counter.internal_count &&

   !new_counter.external_counters) {

   delete this; ←
(3)

  }

  }

 };

};

Реализация

node::release_ref()
лишь немногим отличается от аналогичного кода в
lock_free_stack::pop()
(см. листинг 7.11). Там мы работали с единственным внешним счетчиком, поэтому достаточно было вызвать
fetch_sub
. Здесь же необходимо атомарно обновить всю структуру
count
, хотя в действительности мы хотим модифицировать только поле
internal_count
(1). Поэтому никуда не деться от цикла сравнения с обменом (2). Если после уменьшения
internal_count
оказалось, что и внутренний, и внешний счетчик равны нулю, то это была последняя ссылка, и мы можем удалять узел (3).


Листинг 7.18. Получение новой ссылки на узел в очереди без блокировок

template

class lock_free_queue {

private:

 static void increase_external_count(

  std::atomic& counter,

  counted_node_ptr& old_counter) {

  counted_node_ptr new_counter;

  do {

  new_counter = old_counter;

  ++new_counter.external_count;

  }

  while (!counter.compare_exchange_strong(

  old_counter, new_counter,

  std::memory_order_acquire, std::memory_order_relaxed));


 old_counter.external_count = new_counter.external_count;

 }

};

Листинг 7.18 завершает картину. На этот раз мы не освобождаем ссылку, а получаем новую и увеличиваем внешний счетчик. Функция

increase_external_count()
аналогична
increase_head_count()
из листинга 7.12, отличаясь от нее тем, что преобразована в статическую функцию-член, которая принимает подлежащий обновлению внешний счетчик извне, а не оперирует внутренним членом класса.


Листинг 7.19. Освобождение счётчика внешних ссылок на узел в очереди без блокировок

template

class lock_free_queue {

private:

 static void free_external_counter(

  counted_node_ptr &old_node_ptr) {

  node* const ptr = old_node_ptr.ptr;

  int const count_increase = old_node_ptr.external_count — 2;

  node_counter old_counter =

  ptr->count.load(std::memory_order_relaxed);

  node_counter new_counter;

  do {

  new_counter = old_counter;

  --new_counter.external_counters;       ←
(1)

  new_counter.internal_count += count_increase;←
(2)

  }

  while (!ptr->count.compare_exchange_strong(  ←
(3)

  old_counter, new_counter,

  std::memory_order_acquire, std::memory_order_relaxed));


  if (!new_counter.internal_count &&

    !new_counter.external_counters) {

  delete ptr;←
(4)

  }

 }

};

Функция

free_external_counter()
дополняет
increase_external_count()
. Она аналогична эквивалентной функции из реализации
lock_free_stack::pop()
в листинге 7.11, но модифицировала с учетом появления поля
external_counters
. Она обновляет оба счетчика в одном вызове
compare_exchange_strong()
для всей структуры
count
(3) — точно так же мы поступали при уменьшении
internal_count
в
release_ref()
. Значение
internal_count
обновляется, как в листинге 7.11 (2), a
external_counters
уменьшается на единицу (1). Если теперь оба значения равны нулю, значит, ссылок на узел не осталось, поэтому его можно удалять (4). Оба обновления необходимо выполнять в одном действии (потому и нужен цикл сравнения с обменом), чтобы избежать гонки. Если бы счетчики обновлялись порознь, то два разных потока могли бы решить, что владеют последней ссылкой на узел, и удалить его, что привело бы к неопределенному поведению.

Хотя теперь функция работает и свободна от блокировок, осталась еще одна проблема, касающаяся производительности. После того как один поток начал операцию

push()
, успешно выполнив
compare_exchange_strong()
от имени
old_tail.ptr->data
(точка (5) в листинге 7.15), никакой другой войти в
push()
не может. Попытавшись это сделать, поток увидит новое значение, отличное от
nullptr
, в результате чего вызов
compare_exchange_strong()
вернет
false
, и потоку придется начать цикл заново. Это активное ожидание, которое только потребляет время процессора, не продвигаясь вперед ни на йоту. По сути дела, это блокировка. Первый удачный вызов
push()
блокирует все остальные потоки, пока не завершится, так что этот код более не свободен от блокировок. Хуже того — обычно операционная система может отдать приоритет потоку, удерживающему мьютекс, если существуют заблокированные потоки, но только не в данном случае, поэтому остальные потоки так и будут пожирать процессорное время, пока первый не закончит. И тут мы вытащим на свет очередной припасенный для освобождения от блокировок трюк: ожидающий поток может помочь потоку, который выполняет
push()
.

Освобождение от блокировок одного потока с помощью другого

Чтобы вновь сделать код свободным от блокировок, нам нужно придумать, как ожидающий поток может продвигаться вперед, даже если поток, находящийся в

push()
, застрял. Один из способов — помочь застрявшему потоку, выполнив за него часть работы.

В данном случае мы точно знаем, что нужно сделать: указатель

next
в хвостовом узле требуется установить на новый фиктивный узел, и тогда сам указатель
tail
можно будет обновить. Все фиктивные узлы эквивалентны, поэтому не имеет значения, какой из них использовать — созданный потоком, который успешно поместил в очередь данные, или потоком, ожидающим входа в
push()
. Если сделать указатель
next
в узле атомарным, то для его установки можно будет применить
compare_exchange_strong()
. После того как указатель
next
установлен, в цикле по
compare_exchange_weak()
можно будет установить
tail
, проверяя при этом, указывает ли он по-прежнему на тот же самый исходный узел. Если это не так, значит, узел обновил какой-то другой поток, так что можно прекратить попытки и перейти в начало цикла. Реализация этой идеи потребует также небольшого изменения
pop()
, где нужно будет загрузить указатель
next
; эта модификация показана в листинге ниже.


Листинг 7.20. Модификация

pop()
с целью помочь при выполнении
push()

template

class lock_free_queue {

private:

 struct node {

  std::atomic data;

  std::atomic count;

  std::atomic next;←
(1)

 };


public:

 std::unique_ptr pop() {

  counted_node_ptr old_head =

   head.load(std::memory_order_relaxed);

  for (;;) {

  increase_external_count(head, old_head);

  node* const ptr = old_head.ptr;

  if (ptr == tail.load().ptr) {

   return std::unique_ptr();

  }

  counted_node_ptr next = ptr->next.load();←
(2)

  if (head.compare_exchange_strong(old_head, next)) {

   T* const res = ptr->data.exchange(nullptr);

   free_external_counter(old_head);

   return std::unique_ptr(res);

  }

  ptr->release_ref();

  }

 }

};

Как я и говорил, изменения тривиальны: указатель

next
теперь атомарный (1), поэтому операция
load
в точке (2) атомарна. В данном примере мы используем упорядочение по умолчанию
memory_order_seq_cst
, поэтому явное обращение к
load()
можно было бы опустить, полагаясь на операцию загрузки в операторе неявного преобразования к типу
counted_node_ptr
, но явное обращение будет напоминать нам, куда впоследствии добавить явное задание порядка обращения к памяти.


Листинг 7.21. Пример реализации функции

push()
, освобождаемой от блокировок благодаря помощи извне

template

class lock_free_queue {

private:

 void set_new_tail(counted_node_ptr &old_tail, ←
(1)

          counted_node_ptr const &new_tail) {

  node* const current_tail_ptr = old_tail.ptr;

 while (!tail.compare_exchange_weak(old_tail, new_tail) &&←
(2)

     old_tail.ptr == current_tail_ptr);

  if (old_tail.ptr == current_tail_ptr)←
(3)

  free_external_counter(old_tail);   ←
(4)

  else

  current_tail_ptr->release_ref();  ←
(5)

 }


public:

 void push(T new_value) {

  std::unique_ptr new_data(new T(new_value));

  counted_node_ptr new_next;

  new_next.ptr = new node;

  new_next.external_count = 1;

  counted_node_ptr old_tail = tail.load();


 for (;;) {

  increase_external_count(tail, old_tail);

  T* old_data = nullptr;

  if (old_tail.ptr->data.compare_exchange_strong(  ←
(6)

   old_data, new_data.get())) {

   counted_node_ptr old_next = {0};

   if (!old_tail.ptr->next.compare_exchange_strong(←
(7)

     old_next, new_next)) {

   delete new_next.ptr;←
(8)

   new_next = old_next;←
(9)

   }

   set_new_tail(old_tail, new_next);

   new_data.release();

   break;

  } else {       ←
(10)

   counted_node_ptr old_next = {0};

   if (old_tail.ptr->next.compare_exchange_strong(←
(11)

     old_next, new_next)) {

   old_next = new_next;   ←
(12)

   new_next.ptr = new node;←
(13)

   }

   set_new_tail(old_tail, old_next);←
(14)

  }

  }

 }

};

В целом похоже на исходную версию

push()
из листинга 7.15, но есть и несколько принципиальных отличий. Если указатель
data
действительно установлен (6), то нужно обработать случай, когда нам помог другой поток, и кроме того появилась ветвь
else
, в которой один поток оказывает помощь другому (10).

Установив указатель

data
в узле (6), новая версия
push()
изменяет указатель
next
, вызывая
compare_exchange_strong()
(7). Мы используем
compare_exchange_strong()
, чтобы избежать цикла. Если обмен завершился неудачно, значит, другой поток уже установил указатель
next
, поэтому нам ни к чему узел, выделенный в начале, и его можно удалить (8). Мы также хотим использовать значение
next
, установленное другим потоком, для обновления
tail
(9).

Собственно обновление указателя

tail
вынесено в отдельную функцию
set_new_tail()
(1). В ней мы используем цикл по
compare_exchange_weak()
(2), потому что если другие потоки пытаются поместить в очередь новый узел с помощью
push()
, то значение
external_count
может измениться, а нам не хотелось бы его потерять. Однако нужно позаботиться и о том, чтобы не затереть значение, которое другой поток уже успешно изменил, в противном случае в очереди могут возникнуть циклы, а это уже совершенно лишнее. Следовательно, нужно гарантировать, что часть
ptr
загруженного значения осталась той же самой, если сравнение с обменом не прошло. Если
ptr
не изменился после выхода из цикла (3), то мы успешно установили
tail
, поэтому старый внешний счетчик нужно освободить (4). Если значение
ptr
изменилось, то счетчик уже освобождён другим потоком, поэтому нам нужно только освободить ссылку, которую удерживает наш поток (5).

Если поток, вызвавший

push()
, не сумел установить указатель
data
на этой итерации цикла, то он может помочь более удачливому потоку завершить обновление. Сначала мы пытаемся записать в next указатель на новый узел, выделенный в этом потоке (11). Если это получилось, то выделенный нами узел будет использоваться как новый узел
tail
(12), а нам следует выделить еще один узел, поскольку поместить свои данные в очередь еще только предстоит (13). После этого мы можем попытаться установить узел
tail
, вызвав
set_new_tail
до перехода к очередной итерации (14).

Вероятно, вы обратили внимание на чрезмерно большое для такого крохотного фрагмента количество

new
и
delete
. Вызвало это тем, что новые узлы создаются в
push()
, а уничтожаются в
pop()
. Поэтому быстродействие этого кода существенно зависит от того, насколько эффективно работает распределитель памяти; плохой распределитель может полностью свести на нет свойства масштабируемости, присущие свободному от блокировок контейнеру. Вопрос о выборе и реализации подобных распределителей выходит за рамки данной книги, но имейте в виду, что единственный способ узнать, какой распределитель лучше, — испытывать и замерять производительность. К числу стандартных приемов оптимизации выделения памяти можно отнести создание отдельного распределителя в каждом потоке и использование списка свободных узлов — освободившиеся узлы помещаются в этот список, а не возвращаются распределителю.

Ну и, пожалуй, хватит примеров. Давайте теперь на основе вышеизложенного сформулируем ряд рекомендаций по написанию структур данных, свободных от блокировок.

7.3. Рекомендации по написанию структур данных без блокировок

Если вы тщательно прорабатывали все приведенные в этой главе примеры, то, наверное, оцепили, как трудно правильно написать код без блокировок. Если вы собираетесь проектировать собственные структуры данных, то не лишне будет иметь под рукой рекомендации, на которые можно опираться. Общие советы, относящиеся к параллельным структурам данных, приведенные в начале главы 6, остаются в силе, но этого мало. Из анализа примеров я извлек несколько полезных рекомендаций, к которым вы можете обращаться при проектировании структур данных, свободных от блокировок.

7.3.1. Используйте
std::memory_order_seq_cst
для создания прототипа

Порядок доступа к памяти

std::memory_order_seq_cst
гораздо проще для понимания и анализа, чем любой другой, потому что операции с такой семантикой полностью упорядочены. Во всех примерах из этой главы мы начинали с упорядочения
std::memory_order_seq_cst
и только потом ослабляли ограничения. В этом смысле использование других способов упорядочения можно считать оптимизацией, которой не следует заниматься преждевременно. В общем случае, для того чтоб определить, какие операции можно ослабить, нужно иметь перед глазами полный код, правильно работающий со структурой данных. Любая попытка пойти другим путем только осложнит вам жизнь. Проблема еще и в том, что тестирование кода может не выявить ошибок, но это еще не гарантирует их отсутствия. Если нет верификатора алгоритма, способного систематически тестировать все возможные сочетания видимости в разных потоках, которые совместимы с гарантиями, предоставляемыми заданными режимами упорядочения доступа к памяти (а такие программы существуют), то одного лишь прогона кода недостаточно.

7.3.2. Используйте подходящую схему освобождения памяти

Одна из самых сложных сторон написания свободного от блокировок кода — управление памятью. С одной стороны, требуется любой ценой предотвратить удаление объектов, на которые могут ссылаться другие потоки, а, с другой, удалять объекты как можно раньше, чтобы избежать чрезмерного расхода памяти. В этой главе мы познакомились с тремя методами, обеспечивающими безопасное освобождение памяти.

• Дождаться, когда к структуре данных не будет обращаться ни один поток, и удалить разом все объекты, ожидающие удаления.

• Использовать указатели опасности для выявления потока, обращающегося к конкретному объекту.

• Подсчитывать ссылки на объекты и не удалять их, пока ссылки существуют.

В любом случае идея заключается в том, чтобы каким-то образом отслеживать, сколько потоков обращается к объекту, и удалять его лишь в том случае, когда на него не осталось ни одной ссылки. Существует много методов освобождения памяти в структурах данных, свободных от блокировок. В частности, это идеальная ситуация для применения сборщика мусора. Придумывать алгоритмы гораздо проще, если знаешь, что сборщик мусора удалит узлы, когда они больше не используются, но не раньше.

Другой способ — использовать узлы повторно и полностью освобождать их только тогда, когда уничтожается вся структура данных. Раз узлы используются повторно, то память никогда не становится недействительной, поэтому некоторые проблемы, сопряженные с неопределенным поведением, вообще не возникают. Недостаток же в том, что взамен появляется так называемая проблема ABA.

7.3.3. Помните о проблеме ABA

Проблема ABA свойственна любому алгоритму, основанному на сравнении с обменом. Проявляется она следующим образом.

1. Поток 1 читает атомарную переменную

x
и обнаруживает, что она имеет значение
А
.

2. Поток 1 выполняет некоторую операцию, исходя из этого значения, например разыменовывает его (если это указатель), выполняет поиск или делает еще что-то.

3. Операционная система приостанавливает поток 1.

4. Другой поток выполняет некоторые операции с

x
, в результате которых ее значение изменяется и становится равным
B
.

5. Затем этот поток изменяет данные, ассоциированные со значением

A
, после чего значение, хранящееся в потоке 1, оказывается недействительным. Это может быть нечто кардинальное, например освобождение памяти, адресуемой указателем, или просто изменение какого-то ассоциированного значения.

6. Далее поток снова изменяет значение

x
на
A
, но уже с новыми данными. В случае указателя это может быть новый объект, который но случайному стечению обстоятельства имеет тот же адрес, что прежний.

7. Поток 1 возобновляется и выполняет сравнение с обменом для переменной

x
, сравнивая ее значение с
A
. Операция завершается успешно (потому что значение действительно равно
A
), но это уже не то
А
. Данные, ранее прочитанные потоком на шаге 2, более не действительны, но поток 1 ничего об этом не знает и повреждает структуру данных.

Ни один из представленных в этой главе алгоритмов не страдает от этой проблемы, но совсем нетрудно написать свободный от блокировок алгоритм, в котором она проявится. Самый распространенный способ избежать ее — хранить вместе с переменной

x
счетчик ABA. Операция сравнения с обменом тогда производится над комбинацией
x
и счетчика, как над единым целым. Всякий раз, как значение заменяется, счетчик увеличивается, поэтому даже если окончательное значение
x
не изменилось, сравнение с обменом не пройдёт, если другой поток в промежутке изменял
x
.

Проблема ABA особенно часто встречается в алгоритмах, в которых используются списки свободных узлов или иные способы повторного использования узлов вместо возврата их распределителю.

7.3.4. Выявляйте циклы активного ожидания и помогайте другим потокам

В последнем примере очереди мы видели, что поток, выполняющий операцию

push()
, должен дождаться, пока другой поток, выполняющий ту же операцию, завершит свои действия. Если ничего не предпринимать, то этот поток будет крутиться в цикле активного ожидания, впустую расходуя процессорное время и не продвигаясь ни на шаг. Наличие цикла ожидания в действительности эквивалентно блокирующей операции, а тогда с равным успехом можно было бы воспользоваться мьютексами. Модифицировав алгоритм таким образом, что ожидающий поток выполняет неполные шаги, если планировщик выделил ему время до завершения работы в исходном потоке, мы сможем устранить активное ожидание и сделать операцию неблокирующей. В примере очереди нам для этого потребовалось сделать одну переменную-член атомарной и использовать для ее установки операции сравнения с обменом, но в более сложных структурах данных могут понадобиться более обширные изменения.

Отталкиваясь от структур данных с блокировками, описанных в главе 6, мы в этой главе продемонстрировали простые реализации структур данных без блокировок на примере все тех же стека и очереди. Мы видели, как внимательно нужно подходить к упорядочению доступа к памяти в атомарных операциях, чтобы избежать гонок и гарантировать, что каждый поток видит непротиворечивое представление структуры данных. Мы также поняли, что управление памятью в структурах данных без блокировок оказывается значительно сложнее, чем в структурах с блокировками, и изучили несколько подходов к решению этой проблемы. Еще мы узнали, как предотвращать циклы активного ожидания, помогая завершить работу потоку, которого мы ждем.

Проектирование свободных от блокировок структур данных — сложная задача, при решении которой легко допустить ошибки, зато такие структуры обладают свойствами масштабируемости, незаменимыми в некоторых ситуациях. Надеюсь, что, проработав приведенные в этой главе примеры и ознакомившись с рекомендациями, вы будете лучше подготовлены к разработке собственных структур данных без блокировок, реализации алгоритмов, описанных в научных статьях, или к поиску ошибок, допущенных бывшим сотрудником компании.

Во всех случаях, когда некоторые данные разделяются между потоками, нужно задумываться о применяемых структурах данных и о синхронизации. Проектируя структуры данных с учетом параллелизма, вы сможете инкапсулировать ответственность в самой структуре, позволив основной программе сосредоточиться на решаемой задаче, а не на синхронизации доступа к данным. Этот подход будет продемонстрирован в главе 8, где мы перейдём от параллельных структур данных к написанию параллельных программ. В параллельных алгоритмах для повышения производительности используется несколько потоков, и выбор подходящей параллельной структуры данных приобретает решающее значение.

Загрузка...