Глава 6 Проектирование параллельных структур данных с блокировками

В этой главе:

■ Что понимается под проектированием структур данных, рассчитанных на параллельный доступ?

■ Рекомендации по проектированию таких структур.

■ Примеры реализации параллельных структур данных.

В предыдущей главе мы рассмотрели низкоуровневые детали атомарных операций и модели памяти. В этой главе мы на время отойдем от низкоуровневых деталей (чтобы вернуться к ним в главе 7) и поразмыслим о структурах данных.

От выбора структуры данных может в значительной степени зависеть всё решение поставленной задачи, и параллельное программирование — не исключение. Если к структуре данных планируется обращаться из разных потоков, то возможно два варианта: либо структура вообще неизменяемая, и тогда никакой синхронизации не требуется, либо программа спроектирована так, что любые изменения корректно синхронизированы. Одна из возможностей — завести отдельный мьютекс и пользоваться для защиты данных внешней по отношению к структуре блокировкой, применяя технику, рассмотренную в главах 3 и 4. Другая — спроектировать саму структуру данных, так чтобы к ней был возможен параллельный доступ.

При проектировании структуры данных, допускающей параллельный доступ, мы можем использовать основные строительные блоки многопоточных приложений, описанные в предыдущих главах, например мьютексы и условные переменные. На самом деле, вы уже видели примеры структур, в которых сочетание этих блоков гарантирует безопасный доступ из нескольких потоков.

Мы начнем эту главу с нескольких общих рекомендаций по проектированию параллельных структур данных. Затем мы еще раз вернемся к использованию блокировок и условных переменных в простых структурах, после чего перейдём к более сложным. В главе 7 я покажу, как с помощью атомарных операций можно строить структуры данных без блокировок.

Но довольно предисловий — посмотрим, что входит в проектирование структуры данных для параллельного программирования.

6.1. Что понимается под проектированием структур данных, рассчитанных на параллельный доступ?

На простейшем уровне это означает, что нужно спроектировать структуру данных, к которой смогут одновременно обращаться несколько потоков для выполнения одних и тех же или разных операций, причём каждый поток должен видеть согласованное состояние структуры. Данные не должны теряться или искажаться, все инварианты должны быть соблюдены, и никаких проблематичных состояний гонки не должно быть. Такая структура данных называется потокобезопасной. В общем случае структура данных безопасна только относительно определенных типов параллельного доступа. Не исключено, что несколько потоков могут одновременно выполнять какую-то одну операцию над структурой, а для выполнения другой необходим монопольный доступ. Наоборот, бывает, что несколько потоков могут одновременно и безопасно выполнять различные операции, но при выполнении одной и той же операции в разных потоках возникают проблемы.

Но проектирование с учетом параллелизма этим не исчерпывается: задача заключается в том, чтобы предоставить возможность распараллеливания потокам, обращающимся к структуре данных. По природе своей, мьютекс означает взаимное исключение: в каждый момент времени только один поток может захватить мьютекс. Следовательно, мьютекс защищает структуру данных, явным образом предотвращая истинно параллельный доступ к ней.

Это называется сериализацией: потоки обращаются к защищенным мьютексом данным по очереди, то есть последовательно, а не параллельно. Чтобы обеспечить истинно параллельный доступ, нужно тщательно продумывать внутреннее устройство структуры данных. Одни структуры данных оставляют больший простор для распараллеливания, чем другие, но идея остается неизменной: чем меньше защищаемая область, тем меньше операций приходится сериализовывать и тем больше потенциал для распараллеливания.

Прежде чем знакомиться с конкретными структурами данных, приведём несколько простых рекомендаций, полезных при проектировании с учетом параллелизма.

6.1.1. Рекомендации по проектированию структур данных для параллельного доступа

Как я уже отмечал, при проектировании структур данных для параллельного доступа нужно учитывать два аспекта: обеспечить безопасность доступа и разрешить истинно параллельный доступ. Как сделать структуру потокобезопасной, я уже рассказывал в главе 3.

• Гарантировать, что ни один поток не может увидеть состояние, в котором инварианты структуры данных нарушены действиями со стороны других потоков.

• Позаботиться о предотвращении состояний гонки, внутренне присущих структуре данных, предоставив такие функции, которые выполняли бы операции целиком, а не частями.

• Обращать внимание на том, как ведет себя структура данных при наличии исключений, — не допускать нарушения инвариантов и в этом случае.

• Минимизировать шансы возникновения взаимоблокировки, ограничивая область действия блокировок и избегая но возможности вложенных блокировок.

Прежде чем задумываться об этих деталях, важно решить, какие ограничения вы собираетесь наложить на использование структуры данных: если некоторый поток обращается к структуре с помощью некоторой функции, то какие функции можно в этот момент безопасно вызывать из других потоков?

Это на самом деле весьма важный вопрос. Обычно конструкторы и деструкторы нуждаются в монопольном доступе к структуре данных, но обязанность не обращаться к структуре до завершения конструирования или после начала уничтожения возлагается на пользователя. Если структура поддерживает присваивание, функцию

swap()
или копирующий конструктор, то проектировщик должен решить, безопасно ли вызывать эти операции одновременно с другими или пользователь должен обеспечить на время их выполнения монопольный доступ, хотя большинство других операций можно без опаски выполнять параллельно из разных потоков.

Второй аспект, нуждающийся в рассмотрении, — обеспечение истинно параллельного доступа. Тут я не могу предложить конкретных рекомендаций, а вместо этого перечислю несколько вопросов, которые должен задать себе проектировщик структуры данных.

• Можно ли ограничить область действия блокировок, так чтобы некоторые части операции выполнялись не под защитой блокировки?

• Можно ли защитить разные части структуры данных разными мьютексами?

• Все ли операции нуждаются в одинаковом уровне защиты?

• Можно ли с помощью простого изменения структуры данных расширить возможности распараллеливания, не затрагивая семантику операций?

В основе всех этих вопросов лежит одна и та же мысль: как свести к минимуму необходимую сериализацию и обеспечить максимально возможную степень истинного параллелизма? Часто бывает так, что структура данных допускает одновременный доступ из нескольких потоков для чтения, но поток, желающий модифицировать данные, должен получать монопольный доступ. Такое требование поддерживает класс

boost::shared_mutex
и ему подобные. Как мы скоро увидим, встречается и другой случай: поддерживается одновременный доступ из потоков, выполняющих различные операции над структурой, но потоки, выполняющие одну и ту же операцию, сериализуются.

В простейших потокобезопасных структурах данных обычно для защиты используются мьютексы и блокировки. Хотя, как мы видели в главе 3, им свойственны некоторые проблемы, но гарантировать с их помощью, что в каждый момент времени доступ к данным будет иметь только один поток, сравнительно легко. Мы будем знакомиться с проектированием потокобезопасных структур данных постепенно, и в этой главе рассмотрим только структуры на основе блокировок. А разговор о параллельных структурах данных без блокировок отложим до главы 7.

6.2. Параллельные структуры данных с блокировками

Проектирование параллельных структур данных с блокировками сводится к тому, чтобы захватить нужный мьютекс при доступе к данным и удерживать его минимально возможное время. Это довольно сложно, даже когда имеется только один мьютекс, защищающий всю структуру. Как мы видели в главе 3, требуется гарантировать, что к данным невозможно обратиться без защиты со стороны мьютекса и что интерфейс свободен от внутренне присущих состояний гонки. Если для защиты отдельных частей структуры применяются разные мьютексы, то проблема еще усложняется, поскольку в случае, когда некоторые операции требуют захвата нескольких мьютексов, появляется возможность взаимоблокировки. Поэтому к проектированию структуры данных с несколькими мьютексами следует подходить еще более внимательно, чем при наличии единственного мьютекса. В этом разделе мы применим рекомендации из раздела 6.1.1 к проектированию нескольких простых структур данных, защищаемых мьютексами. В каждом случае мы будем искать возможности повысить уровень параллелизма, обеспечивая в то же время потокобезопасность.

Начнем с реализации стека, приведённой в главе 3; это одна из самых простых структур данных, к тому же в ней используется всего один мьютекс. Но является ли она потокобезопасной? И насколько она хороша с точки зрения достижения истинного распараллеливания?

6.2.1. Потокобезопасный стек с блокировками

В следующем листинге воспроизведен код потокобезопасного стека из главы 3. Задача состояла в том, чтобы реализовать потокобезопасную структуру данных наподобие

std::stack<>
, которая поддерживала бы операции заталкивания и выталкивания.


Листинг 6.1. Определение класса потокобезопасного стека

#include 


struct empty_stack: std::exception {

 const char* what() const throw();

};


template

class threadsafe_stack {

private:

 std::stack data;

 mutable std::mutex m;

public:

 threadsafe_stack(){}

 threadsafe_stack(const threadsafe_stack& other) {

 std::lock_guard lock(other.m);

  data = other.data;

 }

 threadsafe_stack& operator=(const threadsafe_stack&) = delete;


 void push(T new_value) {

  std::lock_guard lock(m);

  data.push(std::move(new_value)); ←
(1)

 }


 std::shared_ptr pop() {

  std::lock_guard lock(m);

  if (data.empty()) throw empty_stack(); ←
(2)

  std::shared_ptr const res(

  std::make_shared(std::move(data.top())));←
(3)

  data.pop(); ←
(4)

  return res;

 }


 void pop(T& value) {

  std::lock_guard lock(m);

  if (data.empty()) throw empty_stack();

  value = std::move(data.top()); ←
(5)

  data.pop(); ←
(6)

 }


 bool empty() const {

  std::lock_guard lock(m);

  return data.empty();

 }

};

Посмотрим, как в этом случае применяются сформулированные выше рекомендации. Во-первых, легко видеть, что базовую потокобезопасность обеспечивает защита каждой функции-члена с помощью мьютекса

m
. Он гарантирует, что в каждый момент времени к данным может обращаться только один поток, поэтому если функции-члены поддерживают какие-то инварианты, то ни один поток не увидит их нарушения.

Во-вторых, существует потенциальная гонка между

empty()
и любой из функций
pop()
, но поскольку мы явно проверяем, что стек пуст, удерживая блокировку в
pop()
, эта гонка не проблематична. Возвращая извлеченные данные прямо в
pop()
, мы избегаем потенциальной гонки, которая могла бы случиться, если бы
top()
и
pop()
были отдельными функциями-членами, как в
std::stack<>
.

Далее, существует несколько возможных источников исключений. Операция захвата мьютекса может возбудить исключение, но, во-первых, это крайне редкий случай (свидетельствующий о проблемах в реализации мьютекса или о нехватке системных ресурсов), а, во-вторых, эта операция всегда выполняется в самом начале любой функции-члена. Поскольку в этот момент никакие данные еще не изменены, опасности нет. Операция освобождения мьютекса не может завершиться ошибкой, она всегда безопасна, а использование

std::lock_guard<>
гарантирует, что мьютекс не останется захваченным.

Вызов

data.push()
(1) может возбудить исключение, если его возбуждает копирование или перемещение данных либо если памяти недостаточно для увеличения размера структуры, в которой хранятся сами данные. В любом случае
std::stack<>
гарантирует безопасность, поэтому здесь проблемы тоже нет.

В первом перегруженном варианте

pop()
наш код может возбудить исключение
empty_stack
(2), но в этот момент еще ничего не изменено, так что мы в безопасности. Создание объекта
res
(3) может возбудить исключение по двум причинам: при обращении к
std::make_shared
может не хватить памяти для нового объекта и внутренних данных, необходимых для подсчёта ссылок, или копирующий либо перемещающий конструктор возбуждает исключение при копировании или перемещении данных в только что выделенную область памяти. В обоих случаях исполняющая среда С++ и стандартная библиотека гарантируют отсутствие утечек памяти и корректное уничтожение нового объекта (если он был создан). Поскольку мы все еще не модифицировали данные стека, все хорошо. Вызов
data.pop()
(4) гарантированно не возбуждает исключений, равно как и возврат результата, так что этот вариант
pop()
безопасен относительно исключений.

Второй перегруженный вариант

pop()
аналогичен, только на этот раз исключение может возбудить оператор копирующего или перемещающего присваивания (5), а не конструктор нового объекта или экземпляра
std::shared_ptr
. Но и теперь мы ничего не изменяли до вызова функции
data.pop()
(6), которая гарантированно не возбуждает исключений, так что и этот вариант безопасен относительно исключений.

Наконец, функция

empty()
вообще не изменяет данные, так что она точно безопасна относительно исключений

В этом коде есть две возможности для взаимоблокировки из-за того, что пользовательский код вызывается, когда удерживается блокировка: копирующий или перемещающий конструктор (1), (3) и копирующий или перемещающий оператор присваивания (5) хранимых в стеке данных. И еще —

operator new
, который также мог бы быть определён пользователем. Если любая из этих функций вызовет функции-члены стека, в который вставляется или из которого удаляется элемент, либо затребует какую-либо блокировку в момент, когда удерживается блокировка, захваченная при вызове функции-члена стека, то может возникнуть взаимоблокировка. Однако было бы разумно возложить ответственность за это на пользователей стека; невозможно представить себе разумную реализацию операций добавления в стек и удаления из стека, которые не копировали бы данные и не выделяли память.

Поскольку все функции-члены используют для защиты данных класс

std::lock_guard<>
, их можно безопасно вызывать из любого количества потоков. Единственные небезопасные функции-члены — конструкторы и деструкторы, но эта проблема не особенно серьезна; объект можно сконструировать и уничтожить только один раз. Вызов функций-членов не полностью сконструированного или частично уничтоженного объекта — это всегда плохо, и к одновременности доступа отношения не имеет. Таким образом, пользователь должен гарантировать, что никакой другой поток не может обратиться к стеку, пока он не будет сконструирован полностью, и что любая операция доступа завершается до начала его уничтожения.

Хотя благодаря блокировке несколько потоков могут одновременно вызывать функции-члены стека, в каждый момент времени с ним реально работает не более одного потока. Такая сериализация потоков может снизить производительность приложения в случае, когда имеется значительная конкуренция за стек, — пока поток ожидает освобождения блокировки, он не выполняет никакой полезной работы. Кроме того, стек не предоставляет средств, позволяющих ожидать добавления элемента. Следовательно, если потоку нужно получить из стека элемент, то он должен периодически опрашивать его с помощью

empty()
или просто вызывать
pop()
и обрабатывать исключение
empty_stack
.

Поэтому для такого сценария данная реализация стека неудачна, так как ожидающий поток должен либо впустую растрачивать драгоценные ресурсы, ожидая данных, либо пользователь должен писать внешний код ожидания и извещения (например, с помощью условных переменных), который сделает внутренний механизм блокировки избыточным и, стало быть, расточительным. Приведенная в главе 4 реализация очереди демонстрирует, как можно включить такое ожидание в саму структуру данных с помощью условной переменной. Это и станет нашим следующим примером.

6.2.2. Потокобезопасная очередь с блокировками и условными переменными

В листинге 6.2 воспроизведен код потокобезопасной очереди из главы 4. Если стек построен по образцу

std::stack<>
, то очередь — по образцу
std::queue<>
. Но ее интерфейс также отличается от стандартного адаптера контейнера, потому что запись в структуру данных должна быть безопасной относительно одновременного доступа из нескольких потоков.


Листинг 6.2. Потокобезопасная очередь с блокировками и условными переменными

template

class threadsafe_queue {

private:

 mutable std::mutex mut;

 std::queue data_queue;

 std::condition_variable data_cond;

public:

 threadsafe_queue() {}


 void push(T new_value) {

  std::lock_guard lk(mut);

  data_queue.push(std::move(data));

  data_cond.notify_one(); ←
(1)

 }


 void wait_and_pop(T& value) { ←
(2)

  std::unique_lock lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  value = std::move(data_queue.front());

  data_queue.pop();

 }


 std::shared_ptr wait_and_pop() ←
(3)

  std::unique_lock lk(mut);

  data_cond.wait(lk, [this] {return !data_queue.empty();});←
(4)

  std::shared_ptr res(

  std::make_shared(std::move(data_queue.front())));

  data_queue.pop();

  return res;

 }


 bool try_pop(T& value) {

  std::lock_guard lk(mut);

  if (data_queue.empty())

  return false;

  value = std::move(data_queue.front());

  data_queue.pop();

  return true;

 }


 std::shared_ptr try_pop() {

  std::lock_guard lk(mut);

  if (data_queue.empty())

  return std::shared_ptr(); ←
(5)

  std::shared_ptr res(

  std::make_shared(std::move(data_queue.front())));

  data_queue.pop();

  return res;

 }


 bool empty() const {

  std::lock_guard lk(mut);

  return data_queue.empty();

 }

};

Структурно очередь в листинге 6.2 реализована аналогично стеку в листинге 6.1, отличие только в обращениях к функции

data_cond.notify_one()
в
push()
(1) и в наличии двух вариантов функции
wait_and_pop()
(2), (3). Оба перегруженных варианта
try_pop()
почти идентичны функциям
pop()
в листинге 6.1 с тем отличием, что не возбуждают исключение, если очередь пуста. Вместо этого одна функция возвращает булевское значение, показывающее, были ли извлечены данные, а вторая — возвращающая указатель на данные (5) — указатель
NULL
. Точно так же можно было бы поступить и в случае стека. Таким образом, если оставить в стороне функции
wait_and_pop()
, то применим тот же анализ, который мы провели для стека.

Новые функции

wait_and_pop()
решают проблему ожидания значения в очереди, с которой мы столкнулись при обсуждении стека; вместо того чтобы раз за разом вызывать
empty()
, ожидающий поток может просто вызвать
wait_and_pop()
, а структура данных обслужит этот вызов с помощью условной переменной. Обращение к
data_cond.wait()
не вернет управление, пока во внутренней очереди не появится хотя бы один элемент, так что мы можем не беспокоиться но поводу того, что в этом месте кода возможна пустая очередь. При этом данные по-прежнему защищаются мьютексом. Таким образом, функции
wait_and_pop()
не вводят новых состояний гонки, не создают возможности взаимоблокировок и не нарушают никаких инвариантов.

В части безопасности относительно исключений есть мелкая неприятность — если помещения данных в очередь ожидают несколько потоков, то лишь один из них будет разбужен в результате вызова

data_cond.notify_one()
. Однако если этот поток возбудит исключение в
wait_and_pop()
, например при конструировании
std::shared_ptr<>
(4), то ни один из оставшихся потоков разбужен не будет. Если это неприемлемо, то можно заменить
notify_one()
на
data_cond.notify_all()
, тогда будут разбужены все потоки, но за это придётся заплатить — большая часть из них сразу же уснет снова, увидев, что очередь по-прежнему пуста. Другой вариант — включить в
wait_and_pop()
обращение к
notify_one()
в случае исключения, тогда другой поток сможет попытаться извлечь находящееся в очереди значение. Третий вариант — перенести инициализацию
std::shared_ptr<>
в
push()
и сохранять экземпляры
std::shared_ptr<>
, а не сами значения данных. Тогда при копировании
std::shared_ptr<>
из внутренней очереди
std::queue<>
никаких исключений возникнуть не может, и
wait_and_pop()
становится безопасной. В следующем листинге приведена реализация очереди, переработанная с учетом высказанных соображений.


Листинг 6.3. Потокобезопасная очередь, в которой хранятся объекты

std::shared_ptr

template

class threadsafe_queue {

private:

 mutable std::mutex mut;

 std::queue > data_queue;

 std::condition_variable data_cond;

public:

 threadsafe_queue() {}


 void wait_and_pop(T& value) {

  std::unique_lock lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  value = std::move(*data_queue.front()); ←
(1)

  data_queue.pop();

 }


 bool try_pop(T& value) {

  std::lock_guard lk(mut);

  if (data_queue.empty())

  return false;

  value = std::move(*data_queue.front()); ←
(2)

  data_queue.pop();

  return true;

 }


 std::shared_ptr wait_and_pop() {

  std::unique_lock lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  std::shared_ptr res = data_queue.front(); ←
(3)

  data_queue.pop();

  return res;

 }


 std::shared_ptr try_pop() {

  std::lock_guard lk(mut);

  if (data_queue.empty())

  return std::shared_ptr();

  std::shared_ptr res = data_queue.front(); ←
(4)

  data_queue.pop();

  return res;

 }


 void push(T new_value) {

  std::shared_ptr data(

  std::make_shared(std::move(new_value))); ←
(5)

  std::lock_guard lk(mut);

  data_queue.push(data);

  data_cond.notify_one();

 }


 bool empty() const {

  std::lock_guard lk(mut);

  return data_queue.empty();

 }

};

Последствия хранения данных, обернутых в

std::shared_ptr<>
, понятны: функции pop, которые получают значение из очереди в виде ссылки на переменную, теперь должны разыменовывать указатель (1), (2), а функции
pop
, которые возвращают
std::shared_ptr<>
, теперь могут напрямую извлекать его из очереди (3), (4) без дальнейших манипуляций.

У хранения данных в виде

std::shared_ptr<>
есть и еще одно преимущество: выделение памяти для нового объекта можно производить не под защитой блокировки в
push()
(5), тогда как в листинге 6.2 это приходилось делать в защищенном участке кода внутри
pop()
. Поскольку выделение памяти, вообще говоря, дорогая операция, это изменение весьма благотворно скажется на общей производительности очереди, так как уменьшается время удержания мьютекса, а, значит, у остальных потоков остается больше времени на полезную работу.

Как и в примере стека, применение мьютекса для защиты всей структуры данных ограничивает возможности распараллеливания работы с очередью; хотя ожидать доступа к очереди могут несколько потоков, выполняющих разные функции, в каждый момент лишь один совершает какие-то действия. Однако это ограничение отчасти проистекает из того, что мы пользуемся классом

std::queue<>
, — стандартный контейнер составляет единый элемент данных, который либо защищен, либо нет. Полностью взяв на себя управление деталями реализации структуры данных, мы сможем обеспечить мелкогранулярные блокировки и повысить уровень параллелизма.

6.2.3. Потокобезопасная очередь с мелкогранулярными блокировками и условными переменными

В листингах 6.2 и 6.3 имеется только один защищаемый элемент данных (

data_queue
) и, следовательно, только один мьютекс. Чтобы воспользоваться мелкогранулярными блокировками, мы должны заглянуть внутрь очереди и связать мьютекс с каждым хранящимся в ней элементом данных.

Проще всего реализовать очередь в виде односвязного списка, как показано на рис. 6.1. Указатель head направлен на первый элемент списка, и каждый элемент указывает на следующий. Когда данные извлекаются из очереди, в head записывается указатель на следующий элемент, после чего возвращается элемент, который до этого был в начале.

Добавление данных производится с другого конца. Для этого нам необходим указатель tail, направленный на последний элемент списка. Чтобы добавить узел, мы записываем в поле next в последнем элементе указатель на новый узел, после чего изменяем указатель tail, так чтобы он адресовал новый элемент. Если список пуст, то оба указателя head и tail равны

NULL
.

В следующем листинге показана простая реализация такой очереди с урезанным по сравнению с листингом 6.2 интерфейсом; мы оставили только функцию

try_pop()
и убрали функцию
wait_and_pop()
, потому что эта очередь поддерживает только однопоточную работу.

Рис. 6.1. Очередь, представленная в виде односвязного списка


Листинг 6.4. Простая реализация однопоточной очереди

template

class queue {

private:

 struct node {

  T data;

  std::unique_ptr next;

  node(T data_):

  data(std::move(data_)) {}

 };

 std::unique_ptr head;←
(1)

 node* tail;        ←
(2)


public:

 queue() {}

 queue(const queue& other) = delete;

 queue& operator=(const queue& other) = delete;


 std::shared_ptr try_pop() {

  if (!head) {

  return std::shared_ptr();

  }

  std::shared_ptr const res(

  std::make_shared(std::move(head->data)));

  std::unique_ptr const old_head = std::move(head);

  head = std::move(old_head->next);←
(3)

  return res;

 }


 void push(T new_value) {

  std::unique_ptr p(new node(std::move(new_value)));

  node* const new_tail = p.get();

  if (tail) {

  tail->next = std::move(p);←
(4)

  } else {

  head = std::move(p);    ←
(5)

  }

  tail = new_tail;      ←
(6)

}

};

Прежде всего отметим, что в листинге 6.4 для управления узлами используется класс

std::unique_ptr
, потому что он гарантирует удаление потерявших актуальность узлов (и содержащихся в них данных) без явного использования
delete
. За передачу владения отвечает
head
, тогда как
tail
является простым указателем на последний узел.

В однопоточном контексте эта реализация прекрасно работает, но при попытке ввести мелкогранулярные блокировки в многопоточном контексте возникают две проблемы. Учитывая наличие двух элементов данных (

head
(1) и
tail
(2)), мы в принципе могли бы использовать два мьютекса — для защиты
head
и
tail
соответственно. Но не всё так просто.

Самая очевидная проблема заключается в том, что

push()
может изменять как
head
(5), так и
tail
(6), поэтому придётся захватывать оба мьютекса. Это не очень хорошо, но не трагедия, потому что захватить два мьютекса, конечно, можно. Настоящая проблема возникает из-за того, что и
push()
, и
pop()
обращаются к указателю
next
в узле:
push()
обновляет
tail->next
(4), a
try_pop()
читает
head->next
(3). Если в очереди всего один элемент, то
head==tail
, и, значит,
head->next
и
tail->next
— один и тот же объект, который, следовательно, нуждается в защите. Поскольку нельзя сказать, один это объект или нет, не прочитав и
head
, и
tail
, нам приходится захватывать один и тот же мьютекс в
push()
и в
try_pop()
, и получается, что мы ничего не выиграли по сравнению с предыдущей реализацией. Есть ли выход из этого тупика?

Обеспечение параллелизма за счет отделения данных

Решить проблему можно, заранее выделив фиктивный узел, не содержащий данных, и тем самым гарантировать, что в очереди всегда есть хотя бы один узел, отделяющий голову от хвоста. В случае пустой очереди

head
и
tail
теперь указывают на фиктивный узел, а не равны
NULL
. Это хорошо, потому что
try_pop()
не обращается к
head->next
, если очередь пуста. После добавления в очередь узла (в результате чего в ней находится один реальный узел)
head
и
tail
указывают на разные узлы, так что гонки за
head->next
и
tail->next
не возникает. Недостаток этого решения в том, что нам пришлось добавить лишний уровень косвенности для хранения указателя на данные, чтобы поддержать фиктивный узел. В следующем листинге показано, как теперь выглядит реализация.


Листинг 6.5. Простая очередь с фиктивным узлом

template

class queue {

private:

 struct node {

  std::shared_ptr data;←
(1)

  std::unique_ptr next;

 };


 std::unique_ptr head;

 node* tail;


public:

 queue():

 head(new node), tail(head.get())
(2)

 {}


 queue(const queue& other) = delete;

 queue& operator=(const queue& other) = delete;

 std::shared_ptr try_pop() {

  if (head.get() ==tail)
(3)

  {

  return std::shared_ptr();

 }

  std::shared_ptr const res(head->data);
(4)

  std::unique_ptr old_head = std::move(head);

  head = std::move(old_head->next); ←
(5)

  return res; ←
(6)

 }


 void push(T new_value) {


                std::shared_ptr new_data(
            

  std::make_shared(std::move(new_value)));
(7)

  std::unique_ptr p(new node);
(8)

  tail->data = new_data;
(9)

  node* const new_tail = p.get();

  tail->next = std::move(p);

  tail = new_tail;

 }

};

Изменения в

try_pop()
минимальны. Во-первых, мы сравниваем
head
с
tail
(3), а не с
NULL
, потому что благодаря наличию фиктивного узла
head
никогда не может обратиться в
NULL
. Поскольку
head
имеет тип
std::unique_ptr
, для сравнения необходимо вызывать
head.get()
. Во-вторых, так как в
node
теперь хранится указатель на данные (1), то можно извлекать указатель непосредственно (4) без конструирования нового экземпляра
T
. Наиболее серьезные изменения претерпела функция
push()
: мы должны сначала создать новый экземпляр
T
в куче и передать владение им
std::shared_ptr<>
(7) (обратите внимание на использование функции
std::make_shared
, чтобы избежать накладных расходов на второе выделение памяти под счетчик ссылок). Вновь созданный узел станет новым фиктивным узлом, поэтому передавать конструктору значение
new_value
необязательно (8). Вместо этого мы записываем в старый фиктивный узел значение только что созданной копии —
new_value
(9). Наконец, первоначальный фиктивный узел следует создать в конструкторе (2).

Уверен, теперь вы задаетесь вопросом, что мы выиграли от всех этих изменений и как они помогут сделать код потокобезопасным. Разберемся. Функция

push()
теперь обращается только к
tail
, но не к
head
, и это, безусловно, улучшение.
try_pop()
обращается и к
head
, и к
tail
, но
tail
нужен только для начального сравнения, так что блокировка удерживается очень недолго. Основной выигрыш мы получили за счет того, что из-за наличия фиктивного узла
try_pop()
и
push()
никогда не оперируют одним и тем же узлом, так что нам больше не нужен всеохватывающий мьютекс. Стало быть, мы можем завести по одному мьютексу для
head
и
tail
. Но где расставить блокировки?

Мы хотим обеспечить максимум возможностей для распараллеливания, поэтому блокировки должны освобождаться как можно быстрее. С функцией

push()
всё просто: мьютекс должен быть заблокирован на протяжении всех обращений к
tail
, а это означает, что мы захватываем его после выделения памяти для нового узла (8) и перед тем, как записать данные в текущий последний узел (9). Затем блокировку следует удерживать до конца функции.

С

try_pop()
сложнее. Прежде всего, нам нужно захватить мьютекс для
head
и удерживать его, пока мы не закончим работать с
head
. По сути дела, этот мьютекс определяет, какой поток производит извлечение из очереди, поэтому захватить его надо в самом начале. После того как значение
head
изменено (5), мьютекс можно освободить; в момент, когда возвращается результат (6), он уже не нужен. Остается разобраться с защитой доступа к
tail
. Поскольку мы обращаемся к
tail
только один раз, то можно захватить мьютекс на время, требуемое для чтения. Проще всего сделать это, поместив операцию доступа в отдельную функцию. На самом деле, поскольку участок кода, в котором мьютекс для
head
должен быть заблокирован, является частью одной функции-члена, то будет правильнее завести отдельную функцию и для него тоже. Окончательный код приведён в листинге 6.6.


Листинг 6.6. Потокобезопасная очередь с мелкогранулярными блокировками

template

class threadsafe_queue {

private:

 struct node {

  std::shared_ptr data;

  std::unique_ptr next;

 };


 std::mutex head_mutex;

 std::unique_ptr head;

 std::mutex tail_mutex;

 node* tail;


 node* get_tail() {

  std::lock_guard tail_lock(tail_mutex);

  return tail;

 }


 std::unique_ptr pop_head() {

  std::lock_guard head_lock(head_mutex);

  if (head.get() == get_tail()) {

  return nullptr;

  }

  std::unique_ptr old_head = std::move(head);

  head = std::move(old_head->next);

  return old_head;

 }


public:

 threadsafe_queue():

  head(new node), tail(head.get()) {}

 threadsafe_queue(const threadsafe_queue& other) = delete;

 threadsafe_queue& operator=(

  const threadsafe_queue& other) = delete;


 std::shared_ptr try_pop() {

  std::unique_ptr old_head = pop_head();

  return old_head ? old_head->data : std::shared_ptr();

 }


 void push(T new_value) {

  std::shared_ptr new_data(

  std::make_shared(std::move(new_value)));

  std::unique_ptr p(new node);

  node* const new_tail = p.get();

  std::lock_guard tail_lock(tail_mutex);

  tail->data = new_data;

  tail->next = std::move(p);

  tail = new_tail;

 }

};

Давайте взглянем на этот код критически, памятуя о рекомендациях из раздела 6.1.1. Прежде чем искать, где нарушены инварианты, надо бы их точно сформулировать:

tail->next == nullptr
.

tail->data == nullptr
.

head == tail
означает, что список пуст.

• Для списка с одним элементом

head->next==tail
.

• Для каждого узла

x
списка, для которого
x!=tail
,
x->data
указывает на экземпляр
T
, a
x->next
— на следующий узел списка. Если
x->next==tail
, то
x
— последний узел списка.

• Если проследовать по указателям

next
, начиная с головы списка, то рано или поздно мы достигнем его хвоста.

Сама по себе, функция

push()
очень проста: все модификации данных защищены мьютексом
tail_mutex
, и инвариант при этом сохраняется, потому что новый хвостовой узел пуст и правильно установлены указатели
data
и
next
для старого хвостового узла, который теперь стал настоящим последним узлом списка.

Самое интересное происходит в функции

try_pop()
. Как выясняется, мьютекс
tail_mutex
нужен не только для защиты чтения самого указателя
tail
, но и чтобы предотвратить гонку при чтении данных из головного узла. Не будь этого мьютекса, могло бы получиться, что один поток вызывает
try_pop()
, а другой одновременно вызывает
push()
, и эти операции никак не упорядочиваются. Хотя каждая функция-член удерживает мьютекс, но это разные мьютексы, а функции могут обращаться к одним и тем же данным — ведь все данные появляются в очереди только благодаря
push()
. Раз потоки потенциально могут обращаться к одним и тем же данным без какого бы то ни было упорядочения, то возможна гонка за данными и, как следствие (см. главу 5), неопределенное поведение. К счастью, блокировка мьютекса
tail_mutex
в
get_tail()
решает все проблемы. Поскольку внутри
get_tail()
захватывается тот же мьютекс, что в
push()
, то оба вызова оказываются упорядоченными. Либо обращение к функции
get_tail(
) происходит раньше обращения к
push()
— тогда
get_tail()
увидит старое значение
tail
— либо после обращения к
push()
— и тогда она увидит новое значение
tail
и новые данные, присоединенные к прежнему значению
tail
.

Важно также, что обращение к

get_tail()
производится под защитой захваченного мьютекса
head_mutex
. Если бы это было не так, то между вызовом
get_tail()
и захватом
head_mutex
мог бы вклиниться вызов
pop_head()
, потому что другой поток вызвал
try_pop()
(и, следовательно,
pop_head()
) и захватил мьютекс первым, не давая первому потоку продолжить исполнение:

Эта реализация

std: :unique_ptr pop_head()←┘
некорректна

{                  
(1) Старое значение tail

получено не

 node* const old_tail = get_tail();←┘
под защитой head_mutex

 std::lock_guard head_lock(head_mutex);

 if (head.get() == old_tail)    ←
(2)

  return nullptr;

 }

 std::unique_ptr old_head = std::move(head);

 head = std::move(old_head->next);←
(3)

 return old_head;

}

При такой — некорректной — реализации в случае, когда

get_tail()
(1) вызывается вне области действия блокировки, может оказаться, что и
head
, и
tail
изменились к моменту, когда первому потоку удалось захватить
head_mutex
, и теперь возвращенный хвост мало того что больше не является хвостом, но и вообще не принадлежит списку. Тогда сравнение
head
с
old_tail
(2) не прошло бы, хотя
head
в действительности является последним узлом. Следовательно, после обновления (3) узел
head
мог бы оказаться в списке дальше
tail
, то есть за концом списка, что полностью разрушило бы структуру данных. В корректной реализации, показанной в листинге 6.6, вызов
get_tail()
производится под защитой
head_mutex
. Это означает, что больше никакой поток не сможет изменить
head
, a
tail
будет только отодвигаться от начала списка (по мере добавления новых узлов с помощью
push()
) — это вполне безопасно. Указатель
head
никогда не сможет оказаться дальше значения, возвращенного
get_tail()
, так что инварианты соблюдаются.

После того как

pop_head()
удалит узел из очереди, обновив
head
, мьютекс освобождается, и
try_pop()
может извлечь данные и удалить узел, если таковой был (или вернуть
NULL
-экземпляр класса
std::shared_ptr<>
, если узла не было), твердо зная, что она работает в единственном потоке, который имеет доступ к этому узлу.

Далее, внешний интерфейс является подмножеством интерфейса из листинга 6.2, поэтому ранее выполненный анализ остается в силе: в интерфейсе нет внутренне присущих состояний гонки.

Вопрос об исключениях более интересен. Поскольку мы изменили порядок выделения памяти, исключения могут возникать в других местах. Единственные операции в

try_pop()
, способные возбудить исключение, — это захваты мьютексов, но пока мьютексы не захвачены, данные не модифицируются. Поэтому
try_pop()
безопасна относительно исключений. С другой стороны,
push()
выделяет из кучи память для объектов
T
и
node
, и каждая такая операция может возбудить исключение. Однако оба вновь созданных объекта присваиваются интеллектуальным указателям, поэтому в случае исключения память корректно освобождается. После того как мьютекс захвачен, ни одна из последующих операций внутри
push()
не может возбудить исключение, так что мы снова в безопасности.

Поскольку мы не изменяли интерфейс, то новых внешних возможностей для взаимоблокировки не возникло. Внутренних возможностей также нет; единственное место, где захватываются два мьютекса, — это функция

pop_head()
, но она всегда захватывает сначала
head_mutex
, а потом
tail_mutex
, так что взаимоблокировки не случится.

Осталось рассмотреть только один вопрос — в какой мере возможно распараллеливание. Эта структура данных предоставляет куда больше таких возможностей, чем приведенная в листинге 6.2, потому что гранулярность блокировок мельче, и больше работы выполняется не под защитой блокировок. Например, в

push()
память для нового узла и нового элемента данных выделяется, когда ни одна блокировка не удерживается. Это означает, что несколько потоков могут спокойно выделять новые узлы и элементы данных в одно и то же время. В каждый момент времени только один поток может добавлять новый узел в список, но выполняющий это действие код сводится к нескольким простым присваиваниям указателей, так что блокировка удерживается совсем недолго по сравнению с реализацией на основе
std::queue<>
, где мьютекс остается захваченным в течение всего времени, пока выполняются операции выделения памяти внутри
std::queue<>
.

Кроме того,

try_pop()
удерживает
tail_mutex
лишь на очень короткое время, необходимое для защиты чтения
tail
. Следовательно, почти все действия внутри
try_pop()
могут производиться одновременно с вызовом
push()
. Объем операций, выполняемых под защитой мьютекса
head_mutex
также совсем невелик; дорогостоящая операция
delete
(в деструкторе указателя на узел) производится вне блокировки. Это увеличивает потенциальное число одновременных обращений к
try_pop()
; в каждый момент времени только один поток может вызывать
pop_head()
, зато несколько потоков могут удалять старые узлы и безопасно возвращать данные.

Ожидание поступления элемента

Ну хорошо, код в листинге 6.6 дает в наше распоряжение потокобезопасную очередь с мелкогранулярными блокировками, но он поддерживает только функцию

try_pop()
(и к тому же всего в одном варианте). А как насчет таких удобных функций
wait_and_pop()
, которые мы написали в листинге 6.2? Сможем ли мы реализовать идентичный интерфейс, сохранив мелкогранулярные блокировки?

Ответ, разумеется, — да, только вот как это сделать? Модифицировать

push()
несложно: нужно лишь добавить вызов
data_cond.notify_one()
в конец функции, как и было в листинге 6.2. Но на самом деле не всё так просто; мы же связались с мелкогранулярными блокировками для того, чтобы увеличить уровень параллелизма. Если оставить мьютекс захваченным на все время вызова
notify_one()
(как в листинге 6.2), то поток, разбуженный до того, как мьютекс освобожден, должен будет ждать мьютекса. С другой стороны, если освободить мьютекс до обращения к notify_one(), то ожидающий поток сможет захватить его сразу, как проснётся (если, конечно, какой-то другой поток не успеет раньше). Это небольшое улучшение, но в некоторых случаях оно бывает полезно.

Функция

wait_and_pop()
сложнее, потому что мы должны решить, где поместить ожидание, какой задать предикат и какой мьютекс захватить. Мы ждем условия «очередь не пуста», оно представляется выражением
head != tail
. Если записать его в таком виде, то придется захватывать и
head_mutex
, и
tail_mutex
, но, разбирая код в листинге 6.6, мы уже поняли, что захватывать
tail_mutex
нужно только для чтения
tail
, а не для самого сравнения, та же логика применима и здесь. Если записать предикат в виде
head != get_tail()
, то нужно будет захватить только
head_mutex
и использовать уже полученную блокировку для защиты
data_cond.wait()
. Прочий код такой же, как в
try_pop()
.

Второй перегруженный вариант

try_pop()
и соответствующий ему вариант
wait_and_pop()
нуждаются в тщательном осмыслении. Если просто заменить возврат указателя
std::shared_ptr<>
, полученного из
old_head
, копирующим присваиванием параметру
value
, то функция перестанет быть безопасной относительно исключений. В этот момент элемент данных уже удален из очереди и мьютекс освобожден, осталось только вернуть данные вызывающей программе. Однако, если копирующее присваивание возбудит исключение (а почему бы и нет?), то элемент данных будет потерян, потому что вернуть его в то же место очереди, где он был, уже невозможно.

Если фактический тип

T
, которым конкретизируется шаблон, обладает не возбуждающими исключений оператором перемещающего присваивания или операцией обмена (
swap
), то так поступить можно, но ведь мы ищем общее решение, применимое к любому типу
T
. В таком случае следует поместить операции, способные возбудить исключения, в защищенную область перед тем, как удалять узел из списка. Это означает, что нам необходим еще один перегруженный вариант
pop_head()
, который извлекает сохраненное значение до модификации списка.

Напротив, модификация функции empty() тривиальна: нужно просто захватить

head_mutex
и выполнить проверку
head == get_tail()
(см. листинг 6.10). Окончательный код очереди приведён в листингах 6.7, 6.8, 6.9 и 6.10.


Листинг 6.7. Потокобезопасная очередь с блокировкой и ожиданием: внутренние данные и интерфейс

template

class threadsafe_queue {

private:

 struct node {

  std::shared_ptr data;

  std::unique_ptr next;

 };

 std::mutex head_mutex;

 std::unique_ptr head;

 std::mutex tail_mutex;

 node* tail;

 std::condition_variable data_cond;


public:

 threadsafe_queue():

  head(new node), tail(head.get()) {}

 threadsafe_queue(const threadsafe_queue& other) = delete;

 threadsafe_queue& operator=(

  const threadsafe_queue& other) = delete;

 std::shared_ptr try_pop();

 bool try_pop(T& value);

 std::shared_ptr wait_and_pop();

 void wait_and_pop(T& value);

 void push(T new_value);

 void empty();

};

Код, помещающий новые узлы в очередь, прост — его реализация (показанная в листинге ниже) близка к той, что мы видели раньше.


Листинг 6.8. Потокобезопасная очередь с блокировкой и ожиданием: добавление новых значений

template

void threadsafe_queue::push(T new_value) {

 std::shared_ptr new_data(

  std::make_shared(std::move(new_value)));


 std::unique_ptr p(new node);

 {

  std::lock_guard tail_lock(tail_mutex);

  tail->data = new_data;

  node* const new_tail = p.get();

  tail->next = std::move(p);

  tail = new_tail;

 }

 data_cond.notify_one();

}

Как уже отмечалось, вся сложность сосредоточена в части pop. В листинге ниже показана реализация функции-члена

wait_and_pop()
и относящихся к ней вспомогательных функций.


Листинг 6.9. Потокобезопасная очередь с блокировкой и ожиданием:

wait_and_pop

template

class threadsafe_queue {

private:

 node* get_tail() {

  std::lock_guard tail_lock(tail_mutex);

  return tail;

 }


 std::unique_ptr pop_head() {←
(1)

  std::unique_ptr old_head = std::move(head);

  head = std::move(old_head->next);

  return old_head;

 }


 std::unique_lock wait_for_data() {←
(2)

  std::unique_lock head_lock(head_mutex);

  data_cond.wait(

  head_lock, [&]{return head.get() != get_tail();});

  return std::move(head_lock);         ←
(3)

 }


 std::unique_ptr wait_pop_head() {

  std::unique_lock head_lock(wait_for_data());←
(4)

  return pop_head();

 }


 std::unique_ptr wait_pop_head(T& value) {

  std::unique_lock head_lock(wait_for_data());←
(5)

  value = std::move(*head->data);

  return pop_head();

 }

public:

 std::shared_ptr wait_and_pop() {

  std::unique_ptr const old_head = wait_pop_head();

  return old_head->data;

 }


 void wait_and_pop(T& value) {

  std::unique_ptr const old_head = wait_pop_head(value);

 }

};

В реализации извлечения из очереди используется несколько небольших вспомогательных функций, которые упрощают код и позволяют устранить дублирование, например:

pop_head()
(1) (модификация списка в результате удаления головного элемента) и
wait_for_data()
(2) (ожидание появления данных в очереди). Особенно стоит отметить функцию
wait_for_data()
, потому что она не только ждет условную переменную, используя лямбда-функцию в качестве предиката, но и возвращает объект блокировки вызывающей программе (3). Тем самым мы гарантируем, что та же самая блокировка удерживается, пока данные модифицируются в соответствующем перегруженном варианте
wait_pop_head()
(4), (5). Функция
pop_head()
используется также в функции
try_pop()
, показанной ниже.


Листинг 6.10. Потокобозопасная очередь с блокировкой и ожиданием:

try_pop()
и
empty()

template

class threadsafe_queue {

private:

 std::unique_ptr try_pop_head() {

  std::lock_guard head_lock(head_mutex);

  if (head.get() == get_tail()) {

  return std::unique_ptr();

  }

  return pop_head();

 }


 std::unique_ptr try_pop_head(T& value) {

  std::lock_guard head_lock(head_mutex);

  if (head.get() == get_tail()) {

  return std::unique_ptr();

  }

  value = std::move(*head->data);

  return pop_head();

 }


public:

 std::shared_ptr try_pop() {

  std::unique_ptr old_head = try_pop_head();

  return old_head ? old_head->data : std::shared_ptr();

 }


 bool try_pop(T& value) {

  std::unique_ptr const old_head = try_pop_head(value);

  return old_head;

 }


 void empty() {

  std::lock_guard head_lock(head_mutex);

  return (head.get() == get_tail());

 }

};

Эта реализация очереди ляжет в основу очереди без блокировок, которую мы будем рассматривать в главе 7. Данная очередь неограниченная, то есть в нее можно помещать и помещать данные, ничего не удаляя, пока не кончится память. Альтернативой является ограниченная очередь, максимальная длина которой задается при создании. Попытка поместить элемент в заполненную очередь либо завершается ошибкой, либо приводит к приостановке потока до тех пор, пока из очереди не будет удален хотя бы один элемент. Ограниченные очереди бывают полезны для равномерного распределения задач между потоками (см. главу 8). Такая дисциплина не дает потоку (или потокам), заполняющему очередь, намного обогнать потоки, читающие из очереди.

Показанную реализацию неограниченной очереди легко преобразовать в очередь с ограниченной длиной, введя ожидание условной переменной в функцию

push()
. Вместо того чтобы ждать, пока в очереди появятся элементы (как
pop()
), мы должны будем ждать, когда число элементов в ней окажется меньше максимума. Дальнейшее обсуждение ограниченных очередей выходит за рамки этой книги, мы же перейдём от очередей к более сложным структурам данных.

6.3. Проектирование более сложных структур данных с блокировками

Стеки и очереди — простые структуры данных, их интерфейс крайне ограничен, и используются они в очень узких целях. Но не все структуры данных так просты, как правило они поддерживают более широкий набор операций. В принципе, это может открыть больше возможностей для распараллеливания, но и проблема защиты данных сильно усложняется, так как приходится учитывать много разных способов доступа. При проектировании структур данных, предназначенных для параллельного доступа, важно принимать во внимание характер выполняемых операций.

Чтобы понять, какие трудности возникают на этом пути, рассмотрим справочную таблицу (lookup table).

6.3.1. Разработка потокобезопасной справочной таблицы с блокировками

Справочная таблица, или словарь позволяет ассоциировать значения одного типа (типа ключа) со значениями того же или другого типа (ассоциированного типа). В общем случае задача такой структуры — запрашивать данные, ассоциированные с данным ключом. В стандартной библиотеке С++ для этой цели предназначены ассоциативные контейнеры:

std::map<>
,
std::multimap<>
,
std::unordered_map<>
, и
std::unordered_multimap<>
.

Справочная таблица используется иначе, чем стек или очередь. Если большинство операций со стеком и очередью каким-то образом модифицируют структуру данных, то есть либо добавляют, либо удаляют элементы, то справочная таблица изменяется сравнительно редко. Примером может служить простой DNS-кэш из листинга 3.13, предоставляющий интерфейс, сильно урезанный по сравнению с

std::map<>
. При рассмотрении стека и очереди мы видели, что интерфейсы стандартных контейнеров не годятся для случая, когда к структуре данных одновременно обращаются несколько потоков, так как им внутренне присущи состояния гонки. Поэтому интерфейсы приходится пересматривать и сокращать.

Самая серьезная с точки зрения распараллеливания проблема в интерфейсе контейнера

std::map<>
— его итераторы. Можно написать итератор так, что доступ к контейнеру с его помощью для чтения и модификации будет безопасен, но это амбициозная задача. Для корректной работы нужно учесть, что другой поток может удалить элемент, на который итератор сейчас ссылается, а это совсем непросто. Поэтому при первом подходе к проектированию интерфейса потокобезопасной справочной таблицы итераторы мы опустим. Памятуя о том, насколько сильно интерфейс
std::map<>
(и прочих стандартных ассоциативных контейнеров) зависит от итераторов, пожалуй, будет разумнее сразу отказаться от попыток смоделировать их и вместо этого придумать новый интерфейс с нуля.

Справочной таблице нужно всего несколько основных операций:

• добавить новую пару ключ/значение;

• изменить значение, ассоциированное с данным ключом;

• удалить ключ и ассоциированное с ним значение;

• получить значение, ассоциированное с данным ключом, если такой ключ существует.

Есть также несколько полезных операций, относящихся к контейнеру в целом, например: проверить, пуст ли контейнер, получить полный текущий список ключей или полное множество пар ключ/ значение.

Если придерживаться базовых рекомендаций, касающихся потокобезопасности, например, не возвращать ссылки и защищать мьютексом все тело каждой функции-члена, то все эти операции будут безопасны. Угроза возникновения гонки возникает прежде всего при добавлении новой пары ключ/значение; если два потока одновременно добавляют новое значение, то лишь один из них будет первым, а второй, следовательно, получит ошибку. Один из способов решить эту проблему состоит в том, чтобы объединить операции добавления и изменения в одной функции-члене, как мы проделали в DNS-кэше в листинге 3.13.

С точки зрения интерфейса, остается еще лишь один интересный момент: фраза «если такой ключ существует» в описании операции получения ассоциированного значения. Можно, например, предоставить пользователю возможность задать некий результат «по умолчанию», который будет возвращен, если указанного ключа нет.

mapped_type get_value(

 key_type const& key, mapped_type default_value);

В таком случае можно использовать сконструированный по умолчанию экземпляр типа

mapped_type
, если
default_value
не было задано явно. Эту идею можно развить и возвращать не просто экземпляр
mapped_type
, а объект типа
std::pair
, где
bool
указывает, было ли найдено значение. Другой вариант — использовать интеллектуальный указатель, ссылающийся на значение; если он равен
NULL
, то значение не было найдено.

Как уже отмечалось, после определения интерфейса (в предположении, что состояний гонки в нем нет), обеспечить потокобезопасность можно, заведя единственный мьютекс, который дет захватываться в начале каждой функции-члена для защиты внутренней структуры данных. Однако тем самым мы сведем на нет все возможности распараллеливания, которые могли бы открыться в результате наличия отдельных функций для чтения и модификации структуры данных. Отчасти исправить ситуацию можно было бы, воспользовавшись мьютексом, который разрешает нескольким потокам читать данные, но только одному — изменять их, например, классом

boost::shared_mutex
из листинга 3.13. Это, конечно, несколько улучшило бы общую картину, но при таком решении модифицировать структуру данных по-прежнему может только один поток. В идеале хотелось бы добиться большего.

Проектирование структуры данных для справочной таблицы с мелкогранулярными блокировками

Как и в случае очереди (см. раздел 6.2.3), чтобы ввести мелкогранулярные блокировки, нужно внимательно изучить особенности структуры данных, а не пытаться обернуть уже имеющийся контейнер, например

std::map<>
. Есть три общепринятых подхода к реализации ассоциативного контейнера, каковым является, в частности, справочная таблица:

• двоичное, например красно-черное, дерево;

• отсортированный массив;

• хеш-таблица.

Двоичное дерево плохо приспособлено для распараллеливания — каждый просмотр или модификация должны начинается с доступа к корневому узлу, который, следовательно, нужно блокировать. Блокировку, конечно, можно освобождать по мере спуска вниз по дереву, но в целом это немногим лучше блокирования всей структуры целиком.

Отсортированный массив еще хуже, потому что заранее невозможно сказать, в каком месте массива может оказаться требуемое значение, поэтому придется заводить одну блокировку на весь массив. Остается только хеш-таблица. В предположении, что число кластеров фиксировано, вопрос о том, в каком кластере находится ключ, является свойством только лишь самого ключа и хеш-функции. А это значит, что мы сможем завести по одной блокировке на каждый кластер. Если еще и использовать мьютекс, который поддерживает несколько читателей и одного писателя, то коэффициент распараллеливания увеличится в N раз, где N — число кластеров. Недостаток в том, что нужна хорошая хеш-функция для ключа. В стандартной библиотеке С++ имеется шаблон

std::hash<>
, которым можно воспользоваться для этой цели. Он уже специализирован для таких фундаментальных типов, как
int
, и некоторых библиотечных типов, например
std::string
, а пользователь может без труда написать специализации и для других типов ключа. Если, следуя примеру стандартных неупорядоченных контейнеров, указать в качестве параметра шаблона тип объекта-функции, которая выполняет хеширование, то пользователь сможет сам решить, специализировать ли ему шаблон
std::hash<>
для типа своего ключа или предоставить отдельную хеш-функцию.

Теперь обратимся собственно к коду. Как могла бы выглядеть реализация потокобезопасной справочной таблицы? Один из вариантов показан ниже.


Листинг 6.11. Потокобезопасная справочная таблица

template

      typename Hash = std::hash >

class threadsafe_lookup_table {

private:

 class bucket_type {

 private:

  typedef std::pair bucket_value;

  typedef std::list bucket_data;

  typedef typename bucket_data::iterator bucket_iterator;


  bucket_data data;

  mutable boost::shared_mutex mutex;←
(1)


  bucket_iterator find_entry_for(Key const& key) const {←
(2)

  return std::find_if(data.begin(), data.end(),

            [&](bucket_value const& item)

  { return item.first == key; });

  }


 public:

  Value value_for(

  Key const& key, Value const& default_value) const {

  boost::shared_lock lock(mutex);←
(3)

  bucket_iterator const found_entry = find_entry_for(key);

  return (found_entry==data.end()) ?

      default_value : found_entry->second;

  }


  void add_or_update_mapping(

  Key const& key, Value const& value) {

  std::unique_lock lock(mutex);←
(4)

  bucket_iterator const found_entry = find_entry_for(key);

  if (found_entry == data.end()) {

   data.push_back(bucket_value(key, value));

  } else {

   found_entry->second = value;

  }

  }


 void remove_mapping(Key const& key) {

  std::unique_lock lock(mutex);←
(5)

  bucket_iterator const found_entry = find_entry_for(key);

  if (found_entry != data.end()) {

   data.erase(found_entry);

  }

  }

 };


 std::vector> buckets;←
(6)

 Hash hasher;


 bucket_type& get_bucket(Key const& key) const {← 
(7)

  std::size_t const bucket_index = hasher(key)%buckets.size();

  return *buckets[bucket_index];

 }

public:

 typedef Key key_type;

 typedef Value mapped_type;

 typedef Hash hash_type;


 threadsafe_lookup_table(

  unsigned num_buckets = 19,

  Hash const& hasher_ = Hash()):

 buckets(num_buckets), hasher(hasher_) {

  for (unsigned i = 0; i < num_buckets; ++i) {

  buckets[i].reset(new bucket_type);

  }

 }


 threadsafe_lookup_table(

  threadsafe_lookup_table const& other) = delete;

 threadsafe_lookup_table& operator=(

  threadsafe_lookup_table const& other) = delete;


 Value value_for(Key const& key,

  Value const& default_value = Value()) const {

  return get_bucket(key).value_for(key, default_value);←
(8)

 }


 void add_or_update_mapping(Key const& key,Value const& value) {

  get_bucket(key).add_or_update_mapping(key, value);←
(9)

 }


 void remove_mapping(Key const& key) {

  get_bucket(key).remove_mapping(key);←
(10)

 }

};

В этой реализации для хранения кластеров используется вектор

std::vector>
(6), это позволяет задавать число кластеров в конструкторе. По умолчанию оно равно 19 (произвольно выбранное простое число); оптимальные показатели работы хеш-таблиц достигаются, когда имеется простое число кластеров. Каждый кластер защищен мьютексом типа
boost::shared_mutex
(1), который позволяет нескольким потокам одновременно читать, но только одному обращаться к любой из функций модификации данного кластера.

Поскольку количество кластеров фиксировано, функцию

get_bucket()
(7) можно вызывать вообще без блокировки (8), (9), (10), а затем захватить мьютекс кластера для совместного (только для чтения) (3) или монопольного (чтение/запись) (4), (5) владения — в зависимости от вызывающей функции.

Во всех трех функциях используется функция-член кластера

find_entry_for()
(2), которая определяет, есть ли в данном кластере указанный ключ. Каждый кластер содержит всего лишь список
std::list<>
пар ключ/значение, поэтому добавлять и удалять элементы легко.

Я уже рассматривал это решение с точки зрения распараллеливания, и мы убедились, что все надежно защищено мьютексами. Но как обстоит дело с безопасностью относительно исключений? Функция

value_for
ничего не изменяет, поэтому с ней проблем нет: если она и возбудит исключение, то на структуру данных это не повлияет.

Функция

remove_mapping
модифицирует список, обращаясь к его функции-члену
erase
, которая гарантированно не возбуждает исключений, так что здесь тоже всё безопасно. Остается функция
add_or_update_mapping
, которая может возбуждать исключения в обеих ветвях
if
. Функция
push_back
безопасна относительно исключений, то есть в случае исключения оставляет список в исходном состоянии, так что с этой ветвью всё хорошо. Единственная проблема — присваивание в случае замены существующего значения; если оператор присваивания возбуждает исключение, то мы полагаемся на то, что он оставляет исходный объект неизмененным. Однако это не влияет на структуру данных в целом и является свойством пользовательского типа, так что пускай пользователь и решает эту проблему.

В начале этого раздела я упоминал, что было бы хорошо, если бы можно было получить текущее состояние справочной таблицы, например, в виде объекта

std::map<>
. Чтобы копия состояния была согласована, потребуется заблокировать контейнер целиком, то есть все кластеры сразу. Поскольку для «обычных» операций захватывается мьютекс только одного кластера, то получение копии состояния будет единственной операцией, блокирующей все кластеры. При условии, что мьютексы всегда захватываются в одном и том же порядке, взаимоблокировка не возникнет. Такая реализация приведена в следующем листинге.


Листинг 6.12. Получение содержимого таблицы

threadsafe_lookup_table
в виде
std::map<>

std::map threadsafe_lookup_table::get_map() const {

 std::vector > locks;

 for (unsigned i = 0; i < buckets.size(); ++i) {

  locks.push_back(

  std::unique_lock(buckets[i].mutex));

 }


 std::map res;

 for (unsigned i = 0; i < buckets.size(); ++i) {

  for (bucket_iterator it = buckets[i].data.begin();

  it != buckets[i].data.end(); ++it) {

  res.insert(*it);

  }

 }

 return res;

}

Реализация справочной таблицы, показанная в листинге 6.11, увеличивает уровень параллелизма таблицы в целом за счет того, что каждый кластер блокируется отдельно, и при этом используется

boost::shared_mutex
, который позволяет нескольким потокам одновременно читать кластер. Но нельзя ли добиться большего уровня параллелизма, еще уменьшив гранулярность блокировок? В следующем разделе мы покажем, как это сделать, воспользовавшись потокобезопасным списковым контейнером с поддержкой итераторов.

6.3.2. Потокобезопасный список с блокировками

Список — одна из самых простых структур данных, и, наверное, написать его потокобезопасную версию будет несложно, правда? Все зависит от того, какая вам нужна функциональность и требуется ли поддержка итераторов — та самая, которую я побоялся включать в справочную таблицу на том основании, что это слишком сложно. Основная идея итератора в духе STL состоит в том, что итератор содержит своего рода ссылку на элемент внутренней структуры данных, образующей контейнер. Если контейнер модифицируется из другого потока, то ссылка должна оставаться действительной, а это значит, что итератор должен удерживать блокировку на какую-то часть структуры. Учитывая, что контейнер никак не контролирует время жизни STL-итератора, такой подход абсолютно непродуктивен.

Альтернатива — включить функции итерирования, например

for_each
, в сам контейнер. Тогда контейнер сможет полностью управлять своим обходом и блокировкой, но зато перестаёт отвечать рекомендациям но предотвращению взаимоблокировок из главы 3. Чтобы функция
for_each
делала что-то полезное, она должна вызывать предоставленный пользователем код, удерживая блокировку. Хуже того, она должна передавать ссылку на каждый элемент контейнера пользовательскому коду, чтобы тот мог к этому элементу обратиться. Можно было бы вместо ссылки передавать копию элемента, но это обойдется слишком дорого, если размер элементов велик.

Таким образом, мы оставим на совести пользователя заботу о предотвращении взаимоблокировок, предупредив его, что в пользовательских функциях не следует ни захватывать блокировки, ни сохранять ссылки, которые могли бы использоваться для доступа к данным вне защиты блокировок и тем самым привести к гонке. В случае внутреннего списка, используемого в реализации справочной таблицы, это абсолютно безопасно, поскольку мы не собираемся делать ничего противозаконного.

Остается решить, какие операции должен поддерживать список. Вернувшись к листингам 6.11 и 6.12, вы увидите, что именно нам требуется:

• добавлять элемент в список;

• удалять элемент из списка, если он удовлетворяет некоторому условию;

• искать в списке элемент, удовлетворяющий некоторому условию;

• изменить элемент, удовлетворяющий некоторому условию;

• скопировать все элементы списка в другой контейнер.

Чтобы получился приличный списковый контейнер общего назначения, полезно было бы добавить еще кое-какие операции, например вставку в указанную позицию, но для нашей справочной таблицы это излишне, поэтому оставляю реализацию в качестве упражнения для читателя.

Основная идея связанного списка с мелкогранулярными блокировками — завести по одному мьютексу на каждый узел. Если список длинный, то получится целая куча мьютексов! Достоинство заключается в том, что операции над отдельными частями списка полностью распараллеливаются: каждая операция удерживает только блокировки на узлы, которыми манипулирует, и освобождает блокировку при переходе к следующему узлу. В листинге ниже приведена реализация такого списка.


Листинг 6.13. Потокобезопасный список с поддержкой итераторов

template

class threadsafe_list {

 struct node { ←
(1)

  std::mutex m;

  std::shared_ptr data;

  std::unique_ptr next;


  node() : ←
(2)

  next() {}


 node(T const& value) : ←
(3)

  data(std::make_shared(value)) {}

 };


 node head;


public:

 threadsafe_list() {}

 ~threadsafe_list() {

  remove_if([](node const&){return true;});

 }


 threadsafe_list(threadsafe_list const& other) = delete;

 threadsafe_list& operator=(

  threadsafe_list const& other) = delete;


 void push_front(T const& value) {

  std::unique_ptr new_node(new node(value));←
(4)

  std::lock_guard lk(head.m);

  new_node->next = std::move(head.next);      ←
(5)

  head.next = std::move(new_node);         ←
(6)

 }


 template

 void for_each(Function f) {           ←
(7)

  node* current = &head;

  std::unique_lock lk(head.m);    ←
(8)

  while(node* const next = current->next.get()) {←
(9)

  std::unique_lock next_lk(next->m);←
(10)

  lk.unlock();      ←
(11)

  f(*next->data);     ←
(12)

  current = next;

  lk = std::move(next_lk);←
(13)

  }

 }


 template

 std::shared_ptr find_first_if(Predicate p) {←
(14)

  node* current = &head;

  std::unique_lock lk(head.m);

  while (node* const next=current->next.get()) {

  std::unique_lock next_lk(next->m);

  lk.unlock();

  if (p(*next->data)) {←
(15)

   return next->data;  ←
(16)

  }

  current = next;

   lk = std::move(next_lk);

  }

  return std::shared_ptr();

 }


 template
(17)

 void remove_if(Predicate p) {

 node* current = &head;

  std::unique_lock lk(head.m);

  while(node* const next = current->next.get()) {

  std::unique_lock next_lk(next->m);

  if (p(*next->data)) {          ←
(18)

   std::unique_ptr old_next = std::move(current->next);

   current->next = std::move(next->next);←
(19)

   next_lk.unlock();

  }       ←
(20)

  else {

   lk.unlock();←
(21)

   current = next;

   lk = std::move(next_lk);

  }

  }

 }

};

Показанный в листинге 6.13 шаблон

threadsafe_list<>
— это реализация односвязного списка, в котором каждый элемент является структурой типа
node
(1). В роли головы
head
списка выступает сконструированный по умолчанию объект
node
, в котором указатель
next
равен
NULL
(2). Новые узлы добавляются в список функцией
push_front()
; сначала новый узел конструируется (4), при этом для хранимых в нем данных выделяется память из кучи (3), но указатель
next
остается равным
NULL
. Затем мы должны захватить мьютекс для узла
head
, чтобы получить нужное значение указателя next (5), после чего вставить узел в начало списка, записав в
head.next
указатель на новый узел (6). Пока всё хорошо: для добавления элемента в список необходимо захватить только один мьютекс, поэтому никакого риска взаимоблокировки нет. Кроме того, медленная операция выделения памяти происходит вне блокировки, так что блокировка защищает только обновление двух указателей — действия, которые не могут привести к ошибке. Переходим к функциям итерирования.

Для начала рассмотрим функцию

for_each()
(7). Она принимает объект
Function
, который применяется к каждому элементу списка; следуя примеру большинства библиотечных алгоритмов, этот объект передаётся по значению и может быть как настоящей функцией, так и объектом класса, в котором определена оператор вызова. В данном случае функция должна принимать в качестве единственного параметра значение типа
T
. Здесь мы производим передачу блокировки. Сначала захватывается мьютекс в головном узле
head
(8). Теперь можно безопасно получить указатель на следующий узел
next
(с помощью
get()
, потому что мы не принимаем на себя владение указателем). Если этот указатель не равен
NULL
(9), то захватываем мьютекс в соответствующем узле (10), чтобы обработать данные. Получив блокировку для этого узла, мы можем освободить блокировку для предыдущего узла (11) и вызвать указанную функцию (12). По выходе из этой функции мы можем обновить указатель
current
на только что обработанный узел и с помощью
move
передать владение блокировкой от
next_lk
в
lk
(13). Поскольку
for_each
передаёт каждый элемент данных напрямую пользовательской функции
Function
, мы можем обновить данные, скопировать их в другой контейнер и вообще сделать всё, что угодно. Если функция не делает того, чего нельзя, то это безопасно, потому что на всем протяжении вызова удерживается мьютекс узла, содержащего элемент данных.

Функция

find_first_if()
(14) аналогична
for_each()
; существенное отличие заключается в том, что переданный предикат
Predicate
должен вернуть
true
, если нужный элемент найден, и
false
в противном случае (15). Если элемент найден, то мы сразу возвращаем хранящиеся в нем данные (16), прерывая поиск. Можно было бы добиться того же результата с помощью
for_each()
, но тогда мы продолжили бы просмотр списка до конца, хотя после обнаружения искомого элемента в этом уже нет необходимости.

Функция

remove_if()
(17) несколько отличается, потому что она должна изменить список;
for_each()
для этой цели непригодна. Если предикат
Predicate
возвращает
true
(18), то мы удаляем узел из списка, изменяя значение
current->next
(19). Покончив с этим, мы можем освободить удерживаемый мьютекс следующего узла. Узел удаляется, когда объект
std::unique_ptr
, в который мы его переместили, покидает область видимости (20). В данном случае мы не изменяем
current
, потому что необходимо проверить следующий узел
next
. Если
Predicate
возвращает
false
, то нужно просто продолжить обход списка, как и раньше (21).

А могут ли при таком обилии мьютексов возникнуть взаимоблокировки или состояния гонки? Ответ — твердое нет, при условии, что полученные от пользователя предикаты и функции ведут себя, как положено. Итерирование всегда производится в одном направлении, начиная с узла

head
, и следующий мьютекс неизменно блокируется до освобождения текущего, поэтому не может случиться так, что в разных потоках порядок захвата мьютексов будет различен. Единственный потенциальный кандидат на возникновение гонки — удаление исключенного из списка узла в функции
remove_if()
(20), потому что это делается после освобождения мьютекса (уничтожение захваченного мьютекса приводит к неопределённому поведению). Однако, немного поразмыслив, мы придём к выводу, что это безопасно, так как в этот момент все еще удерживается мьютекс предыдущего узла (
current
), поэтому ни один другой поток не сможет попытаться захватить мьютекс удаляемого узла.

Что можно сказать по поводу распараллеливания? Вся эта возня с мелкогранулярными блокировками затевалась для того, чтобы увеличить уровень параллелизма по сравнению с единственным мьютексом. Так достигли мы своей цели или нет? Да, безусловно — теперь разные потоки могут одновременно работать с разными узлами списка, выполняя разные функции:

for_each()
для обработки каждого узла,
find_first_if()
для поиска или
remove_if()
для удаления элементов. Но, поскольку мьютексы узлов захватываются по порядку, потоки не могут обгонять друг друга. Если какой-то поток тратит много времени на обработку конкретного узла, то, дойдя до этого узла, остальные потоки вынуждены будут ждать.

6.4. Резюме

В начале этой главы мы обсудили, что понимается под проектированием структуры данных, допускающей распараллеливание, и сформулировали несколько рекомендаций. Затем на примере нескольких широко распространенных структур данных (стек, очередь, хеш-таблица и связанный список) мы видели, как эти рекомендации применяются на практике — обеспечивают параллельный доступ с применением блокировок и предотвращают гонки. Теперь вы можете проанализировать дизайн своих структур данных и подумать, где в нем есть возможности для распараллеливания, а где возможны состояния гонки.

В главе 7 мы узнаем, как можно, не отходя от сформулированных рекомендаций, вообще обойтись без блокировок, применяя для обеспечения необходимых ограничений на упорядочение низкоуровневые атомарные операции.

Загрузка...