Прежде, чем мы начнем обсуждать потоки, процессы, кванты времени и другие замечательные «концепции диспетчеризации», давайте поговорим об аналогиях.
Сначала я хотел бы проиллюстрировать, как функционируют потоки и процессы. На мой взгляд, лучший способ (о глубинном изучении систем реального времени сейчас речь не идет) — это вообразить поведение наших потоков и процессов в некоторой привычной для нас обстановке.
Давайте используем для построения аналогий о процессах и потоках объект, который мы используем повседневно — наш собственный дом.
Дом реально представляет собой контейнер с некоторыми атрибутами (общая площадь дома, число спален, и т.д.).
Если рассматривать жилой дом с этой точки зрения, он ничего не делает сам по себе. Дом — пассивный объект, в этом он аналогичен процессу. Поговорим об этом вкратце.
Люди, живущие в доме, суть активные объекты — они живут в комнатах, просматривают телепрограммы, готовят пищу, принимают душ, и т.д. Скоро мы поймем, что потоки функционируют аналогично.
Если вы когда-либо жили в одиночестве, вы знаете, каково это — вы можете делать в доме все, что вы пожелаете и когда вы пожелаете, потому что в доме больше никого нет. Если вы пожелаете включить стерео, принять душ, приготовить обед, что угодно — вы просто идете и делаете это.
Ситуация в корне изменится, если вы введете в дом еще одного человека. Скажем, вы женитесь. Теперь у вас есть супруга, живущая в этом же доме вместе с вами. Теперь уже вы не сможете попасть в душ в любой момент времени — придется каждый раз сначала проверять, нет ли там вашей супруги.
Если вы оба — взрослые и ответственные люди, о вопросах безопасности обычно можно не беспокоиться. Вы будете уверены в том, что другой совершеннолетний человек будет уважать ваши правила, принципы и жизненное пространство и не попробует тайком поджечь кухню, и т.д.
А если теперь добавить в дом несколько детей — тут все станет еще интереснее.
Так же как и дом занимает некоторый участок земли в жилом массиве, так и процесс занимает некоторый объем памяти компьютера. Аналогично тому, как и обитатели в доме могут свободно войти в любую комнату, в которую пожелают, потоки в процессах все вместе имеют общий доступ к этой памяти. Если поток получает доступ к некоему объекту (мама покупает игрушку), все другие потоки немедленно получают к нему доступ, потому что этот объект существует в общем адресном пространстве — в доме. Аналогично, если процесс распределяет для себя память, эта память становится доступной для всех потоков. Хитрость здесь состоит в том, что необходимо знать, должна ли эта память быть доступной для всех потоков в процессе. Если это так, то доступ потоков к ней придется синхронизировать. Если это не так, то будем считать, что эта память относится к одному конкретному потоку. В этом случае, поскольку только один поток имеет доступ к этой памяти, можно считать, что синхронизация не потребуется — не будет же этот поток сам ставить себе подножки!
Из нашего повседневного опыта мы знаем, что вещи не так просты, как кажутся. Теперь, когда мы рассмотрели основные характеристики (резюме: любой объект является разделяемым!) давайте обратимся к более интересным ситуациям и выясним, чем же они так интересны.
На рисунке, представленном ниже, показано, как мы в дальнейшем будем представлять потоки и процессы. Процесс здесь — это круг, отображающий «контейнерную» концепцию (адресное пространство), а три ломаных линии — это потоки. Вы найдете найдете подобные иллюстрации далее во всех разделах этой книги.
Процесс как контейнер потоков.
Если вы хотите принять душ, и в доме есть еще кто-то, и этот кто-то уже в ванной, вам придется подождать. Как же поток функционирует в аналогичной ситуации?
Потоки используют то, что мы называем взаимным исключением (mutual exclusion). Означает это в значительной степени то, о чем вы и подумали — несколько потоков являются взаимно исключающими, когда речь идёт идет об определенном ресурсе.
Если вы хотите принять душ, это значит, что вы хотите получить эксклюзивный доступ к ванной комнате. Для этого вы должны сначала войти в ванную, а затем закрыть ее дверь изнутри. Если при этом данной ванной комнатой попытается воспользоваться кто-либо другой, его остановит запертая дверь. После того как вы закончили свои дела в ванной, вы откроете дверь и этим позволите еще кому-либо получить доступ в душ.
Именно так и поступает поток. Поток использует объект, называемый мутексом (сокращенно от MUTual Exclusion — взаимное исключение). Этот объект подобен замку в двери: как только поток заблокирует мутекс, никакой другой поток не сможет получить доступ к мутексу до тех пор, пока владеющий мутексом поток его не разблокирует — иными словами, мутекс будет удерживать другие потоки, подобно дверному замку.
Другая интересная параллель, которая проявляется как с мутексами, так и по аналогии с дверными замками, состоит в том, что мутекс является действительно «рекомендательной» блокировкой. Если поток не подчиняется правилам использования мутексов, то такая защита бессмысленна. В нашей аналогии с жилым домом эта ситуация подобна тому, как кто-либо вломился бы в ванную комнату через одну из стен, игнорируя соглашение о запертой двери.
А что если ванная комната в настоящее время заперта, и множество людей ожидают момента, чтобы ею воспользоваться? Очевидно, все они располагаются вне ее, ожидая, когда же тот, кто в ней находится, наконец выйдет. Закономерный вопрос: «А что произойдет, когда дверь откроется? Кто должен войти следующим?»
Можно предположить, что было бы «справедливым» позволить войти следующим тому, кто ожидает более длительное время. Или было бы «справедливо» позволить войти в ванную следующим тому, кто бы был, например, самый старший по возрасту, или самый высокий, или самый главный. Имеется множество способов определить то, что признавать «справедливым».
Применительно к потокам, мы решаем эту проблему с учетом только двух факторов: приоритета и продолжительности ожидания.
Предположим, что одновременно два человека оказываются у запертой двери в ванную комнату. Одного из них уже «поджимает» время (он опаздывает на совещание), в то время как другой тоже опаздывает, но не так уж сильно. Разве не имело бы смысл позволить тому, кого поджимает время, войти в ванную следующим? Разумеется, имело бы. Остается единственный вопрос о том, как вы принимаете решение о том, кто более «важен» в такой ситуации. Это можно сделать, например, назначив приоритет (давайте использовать номера приоритетов такие, какие приняты в QNX/Neutrino: для рассматриваемой версии QNX/Neutrino номер 1 — самый низкий, номер 63 — самый высокий). Людям в доме, которые имеют неотложные дела, следовало бы дать более высокий приоритет, а тем, у которых таких дел нет, — более низкий. Так же дела обстоят и с потоками. Если бы на момент разблокировки мутекса в ожидании находилось множество потоков, мы бы отдали этот мутекс ожидающему потоку с наивысшим приоритетом. Предположим, однако, что оба человека имеют тот же самый приоритет. Что делать? Хорошо, в этом случае было бы «справедливо» позволить человеку, который ожидал более длительное время, войти следующим. Это было бы не только «справедливо», но и так же, как это делает ядро в QNX/ Neutrino. В случае, когда в ожидании находится группа потоков, мы выстраиваем их сначала по приоритету, а уже в пределах каждого приоритета — по продолжительности ожидания.
Мутекс, конечно же, не единственное средство синхронизации из тех, которые нам доведется встретить. Давайте же рассмотрим и некоторые другие тоже.
Давайте переместимся из ванной комнаты на кухню, так как это социально адаптированное помещение для одновременного обитания более чем одного человека. На кухне вы можете не пожелать, чтобы все и каждый находились бы там одновременно. В действительности вы бы, вероятно, пожелали ограничить число людей на кухне (поваров, например).
Скажем, вы не хотите, чтобы на кухне находилось одновременно более двух человек. Смогли бы вы это реализовать с помощью мутекса? В пределах принятого определения — нет. Почему нет? Это действительно очень интересная проблема в нашей аналогии с домом. Давайте разобьем возникшую проблему на части и проанализируем ситуацию поэтапно.
В ванной комнате возможна одна из двух ситуаций, каждая из которых характеризуется двумя жестко взаимосвязанными состояниями:
• дверь открыта, и в ванной никого нет;
• дверь закрыта, и в помещении находится один человек.
Здесь никакая другая комбинация состояний невозможна — в пустом помещении дверь не может быть никем заперта изнутри (иначе как мы бы ее тогда открыли?), и дверь не может быть открыта кем-либо вне ванной (иначе как бы мы тогда обеспечили приватность использования?). Это и есть пример семафора с единичным значением счетчика — в помещении может находиться не более одного человека, или, иными словами, только один поток может использовать семафор.
Ключевым здесь (прошу прощения за каламбур) является подход к определению замка. В типовой ванной комнате вы сможете запереть и отпереть дверь только изнутри — снаружи средств для этого не предусмотрено. В действительности это означает что блокировка мутекса — это атомарная операция, и невозможна ситуация, в которой, пока вы находитесь в процессе блокировки мутекса, его заблокирует некоторый другой поток, так что в результате вы оба стали бы владельцами этого мутекса. В наше аналогии с жилым домом это не так очевидно — хотя бы потому, что люди гораздо умнее, чем нули и единицы.
Что нам действительно потребуется на кухне, так это замок другого типа.
Предположим, что мы установили в двери на кухне обычный, открываемый ключом замок. Принцип работы этого замка заключается в том, что если у вас есть ключ, вы можете отпереть дверь и войти. Любой, кто использует этот замок, должен быть согласен с тем, что, войдя, он немедленно запрет дверь изнутри, чтобы любому, кто находится вне кухни, для входа всегда требовался бы ключ.
Ну вот, теперь управлять количеством людей, которых мы пожелали бы одновременно видеть на кухне, становится весьма легким делом — достаточно просто повесить на дверь снаружи несколько ключей. Напоминаем, что кухня должна быть всегда закрыта! Когда кто-либо пожелает попасть на кухню, он увидит, что на двери кухни висит ключ. Если это так, он возьмет этот ключ, откроет им дверь, войдет внутрь и этим же ключом закроет дверь изнутри.
Поскольку человек, входящий на кухню, должен взять ключ с собой (без этого он просто не сможет закрыть дверь изнутри), получается, что, ограничивая число висящих снаружи ключей, мы можем непосредственно управлять количеством людей, которым позволено быть на кухне в любой заданный момент времени.
При операциях с потоками подобный механизм реализуется путем применения семафоров. «Простые» семафоры работают точно так же, как и мутексы. Вы либо являетесь владельцем мутекса — в этом случае вы имеете доступ к ресурсу, — или нет — тогда вы не имеете доступа. Семафор, описанный выше в аналогии с доступом на кухню, является семафором со счетчиком. Такой семафор отслеживает состояние своего внутреннего счетчика обращений (т.е. число ключей, доступных потокам).
Мы только что задали себе вопрос: «Смогли бы мы реализовать блокировку со счетом с помощью мутекса?» Ответ был отрицательный. А если наоборот? Смогли бы мы использовать семафор в качестве мутекса?
Да, смогли бы. В действительности в некоторых операционных системах так все и делается — никаких мутексов, одни семафоры! Зачем тогда вообще беспокоиться о мутексах?
Для того чтобы ответить на этот вопрос, рассмотрим ситуацию в нашей аналогии с ванной комнатой. Как строитель вашего дома реализовал мутекс? Я подозреваю, что в вашем доме нет ключей, которые вешались бы на двери снаружи.
Мутексы — это семафоры «специального назначения». Если вы пожелаете, чтобы в определенном месте программы выполнялся только один поток, эффективнее всего было бы реализовать это при помощи мутекса.
Позже мы рассмотрим и другие способы синхронизации потоков — объекты, которые называются условными переменными (condvar), барьерами (barrier) и ждущими блокировками (sleepon).
Чтобы не возникло путаницы, необходимо также иметь в виду, что мутекс имеет и другие свойства (например наследование приоритетов), отличающие его от семафора.
Наша аналогия с процессами в жилом доме прекрасна для объяснения концепций синхронизации, но бесполезна при анализе одной очень важной проблемы. В доме у нас было много потоков, работающих одновременно. Однако в реальной жизненной ситуации обычно имеется только один процессор, так что только один объект может реально работать в одно и то же время.
Давайте рассмотрим, что происходит в реальном мире, и особенно в ситуации «экономии», где в системе есть только один процессор. В этом случае, поскольку имеется только один процессор, в любой заданный момент времени может выполняться только один поток. Ядро решает (с учетом ряда правил, которые мы кратко рассмотрим), какой поток должен выполняться, и запускает его.
Если вы покупаете систему, в которой имеется множество идентичных процессоров, совместно использующих одну и ту же память и устройства, это означает, что у вас есть блок SMP. (SMP расшифровывается как «Symmetrical Multi-Processor» — «симметричный мультипроцессор»; с помощью слова «симметричный» подчеркивается, что все центральные процессоры, применяемые в системе, являются идентичными.) В таком случае число потоков, которые могут работать одновременно, ограничено количеством процессоров. (Кстати, в случае с одним процессором была та же самая ситуация!) Поскольку каждый процессор может одновременно обрабатывать только один поток, в ситуации с применением множества процессоров несколько потоков могут работать одновременно. Давайте пока абстрагируемся от числа процессоров в системе — при проектировании системы бывает полезно считать, что несколько потоков могут выполняться одновременно, даже если это и не происходит в реальной ситуации. Несколько позже в разделе «На что следует обратить внимание при использовании SMP» мы рассмотрим кое-какие неочевидные особенности симметричного мультипроцессирования.
Так кто же определяет, который из потоков должен выполняться в данный момент времени? Этим занимается ядро.
Ядро определяет, который из потоков должен использовать процессор в данный момент времени и переключает контекст на этот поток. Давайте посмотрим, что ядро при этом делает с процессором.
Процессор имеет несколько регистров (точное их число зависит от принадлежности процессора к серии, например, сравните процессор x86 с процессором MIPS, а характерный представитель серии, например, процессор 80486 — с процессором Pentium). В тот момент, когда поток выполняется, информация о нем хранится в указанных регистрах (например, данные о размещении программы в памяти).
Когда же ядро принимает решение о том, что должен выполняться другой поток, оно должно сделать следующее:
1. Сохранить текущее состояние регистров активного потока и другую контекстную информацию.
2. Записать в регистры информацию для нового потока, а также загрузить новый контекст.
Как ядро принимает решение о том, что должен выполняться другой поток? Оно анализирует, действительно ли в данный момент времени данный поток готов к использованию процессора. Когда мы обсуждали, например, мутексы, мы говорили о состояниях блокировки (это происходило в тех случаях, когда поток пытался завладеть мутексом, уже принадлежащим другому потоку, и поэтому блокировался). Таким образом, с точки зрения ядра мы имеем один поток, который может использовать процессор, и другой поток, которые не может этого делать, потому что он заблокирован в ожидании мутекса. В этом случае ядро предоставляет процессор потоку, который готов к работе, а другой поток заносит в свой внутренний список (чтобы можно было отслеживать отслеживал запрос потока на мутекс).
Очевидно, это не очень-то интересная ситуация. Предположим, что готовы к выполнению сразу несколько потоков. Вспомним, не мы ли делегировали доступ к мутексу на основе приоритета и продолжительности ожидания? Ядро тоже использует подобную схему для определения того, который из потоков должен работать следующим. При этом играют роль два фактора: приоритет и дисциплина диспетчеризации. Рассмотрим их по очереди.
Рассмотрим два готовых к выполнению потока. Если эти поток имеют различные приоритеты, то весьма прост — ядро отдает процессор потоку с высшим приоритетом. Приоритеты в QNX/ Neutrino пронумерованы от единицы (самый низкий) и далее, в единичным дискретом — так же, как это было упомянуто в обсуждении получения мутекса. Заметьте, что нулевой приоритет использовать нельзя — он зарезервирован для «холостого» (idle) потока (на профессиональном жаргоне часто называемого «холодильником» — прим. ред.). (Если вы захотите узнать минимальное или максимальное значение приоритета, определенное для вашей системы, используйте функции sched_get_priority_min() и sched_get_priority_max() — они описаны в
. В данной книге мы будем предполагать, что приоритет 1 является самым низким, а 63 самым высоким.
Если другой поток с более высоким приоритетом вдруг становится готов к выполнению, ядро немедленно переключит контекст на поток с более высоким приоритетом. Это называется вытеснением— поток с высшим приоритетом вытесняет поток с низшим приоритетом. Когда поток с высшим приоритетом заканчивает свою работу, и ядро переключает контекст обратно на поток с низшим приоритетом, который выполнялся ранее, мы называем это возобновлением— ядро возобновляет работу предыдущего потока.
Теперь предположим, что не один, а два потока готовы к выполнению и имеют один и тот же приоритет.
Предположим, что в данное время выполняется один из потоков, Рассмотрим правила, которые используются ядром при принятии решения о переключении контекста в такой ситуации. (Разумеется, все это обсуждение в действительности применимо только к потокам с одинаковыми приоритетами — как только будет готов к выполнению поток с высшим приоритетом, процессор будет отдан ему. В этом вся суть приоритетов в операционной системе реального времени.)
Ядро QNX/Neutrino поддерживает две дисциплины диспетчеризации: карусельную, она же RR (Round Robin), и FIFO (First In — First Out).
При диспетчеризации FIFO процессор предоставляется потоку на столько времени, сколько ему необходимо. Это означает, что если один поток занят длительными вычислениями, и никакой другой поток с более высоким приоритетом не готов к выполнению, то этот поток потенциально может выполняться вечно. А как же потоки с тем же приоритетом? Они будут заблокированы тоже. (То, что в этот же момент потоки с более низким приоритетом будут заблокированы, должно быть очевидно.)
Если работающий поток завершает свою работу или добровольно уступает процессор, ядро анализирует состояние других потоков того же самого приоритета на готовность их к выполнению. Если таковых не имеется, то ядро анализирует потоки с более низким приоритетом, готовые к выполнению. Заметьте, что выражение «добровольно уступить процессор» может означать одну из двух возможных ситуаций. Если поток переходит в режим ожидания, блокируется на семафоре, и т.д., тогда — да, может выполняться поток с более низким приоритетом (как описано выше). Но существует также специальная функция sched_yield() (базированная на системном вызове SchedYield()), по которому процессор передается только другому потоку с тем же самым приоритетом — если бы был готов к выполнению поток с высшим приоритетом, у потока с низшим приоритетом все равно не было бы никаких шансов получить управление. Если поток вызывает функцию sched_yield(), но никакой другой поток с таким же самым приоритетом не готов к выполнению, первоначальный поток продолжает работу. В реальности, функция sched_yield() применяется для того, чтобы дать шанс другому потоку с таким же самым приоритетом получить доступ к процессору.
На рисунке, приведенном ниже, мы видим три потока, размещенных в двух различных процессах:
Три потока в двух различных процессах.
Если мы предположим, что потоки «А» и «В» находятся в состоянии READY («готов»), что поток «С» блокирован (возможно, в ожидании мутекса), а другой поток «D» (не показан) в настоящее время выполняется, то очередь готовности, которую поддерживает ядро QNX/Neutrino, будет выглядеть следующим образом:
Два потока в очереди готовности, один блокирован, один выполняется.
На рисунке иллюстрируется внутренняя очередь готовности, которую использует ядро при принятии решения о том, кого запланировать на выполнение следующим. Заметьте, что поток «С» не находится в очереди готовности, потому что он блокирован, и поток «D» также не находится в этой очереди, потому что он уже выполняется.
Дисциплина RR (карусельная диспетчеризация) аналогична дисциплине диспетчеризации FIFO, за исключением того, что поток не будет работать бесконечно, если имеется другой поток с тем же самым приоритетом. Поток будет работать только в течение предопределенного кванта времени (который фиксирован и не может быть изменен). Вы можете узнать величину кванта времени, используя функцию sched_rr_get_interval().
Когда ядро запускает на обработку поток с дисциплиной диспетчеризации RR, оно засекает время. Если поток не блокируется в течение выделенного ему кванта времени, квант времени истечет. Тогда ядро проверяет наличие другого готового к выполнению потока с тем же самым приоритетом. Если такой поток обнаруживается, то ядро активирует его. Если такого потока нет, то ядро снова ставит на выполнение предыдущий поток (то есть ядро выделяет потоку для работы еще один квант времени).
Давайте сделаем сводку правил диспетчеризации (для одиночного процессора) и отсортируем их в порядке важности:
• только один поток может выполняться в данный момент времени;
• всегда должен выполняться поток с наивысшим авторитетом;
• поток должен работать до тех пор, пока он не блокируется иди не завершается;
• поток, диспетчеризуемый по дисциплине карусельного типа (RR), должен работать в течение выделенного ему кванта времени, после чего ядро обязано его перепланировать (при необходимости).
Для систем с несколькими процессорами, приведенные выше правила остаются такими же, за исключением того, что несколько процессоров могут одновременно выполнять несколько потоков. Порядок, в котором потоки выполняются (то есть последовательность, в которой потоки ставятся на выполнение в многопроцессорной системе), определяется точно так же, как для одиночного процессора — в любой момент времени будет выполняться готовый к выполнению поток с наивысшим приоритетом. Если существует другой готовый к выполнению поток с более высоким приоритетом, и имеется доступный процессор, то этот поток будет выполняться на следующем процессоре, и так далее. Если имеющегося числа потоков недостаточно для того, чтобы загрузить все процессоры по такому принципу, то нет проблем — «неактивные» процессоры будут выполнять «холостой» поток (его приоритет равен нулю, то есть ниже, чем приоритет любого пользовательского потока) Если для того, чтобы обработать всю очередь, недостаточно процессоров, тогда только N потоков с наивысшим приоритетом будут выполняться, где N — число доступных процессоров. Другие потоки будут готовы к выполнению, но в действительности выполняться не будут. Отметим, что вопросы диспетчеризации потоков в симметричной мультипроцессорной системе все еще исследуются, так что возможно, что этот порядок может измениться в будущем.
Схема алгоритма диспетчеризации.
Несколько раз небрежно упомянув о «выполнении», «готовности» и «блокировке», давайте теперь формализуем эти состояния потока.
Состояние выполнения (RUNNING) в QNX/Neutrino означает, что поток активно использует ресурсы процессора. В системе SMP будет осуществляться выполнение множества потоков, а в системе с единственным процессором будет осуществляться выполнение одного потока.
Состояние готовности (READY) означает, что этот поток может быть поставлен на выполнение немедленно, но не выполняется, потому что в данный момент времени активен другой поток (с таким же или более высоким приоритетом). Если бы два потока были готовы к выполнению, один из них с приоритетом 10, а другой — с приоритетом 7, то поток с приоритетом 10 был бы переведен в состояние выполнения (RUNNING), а поток с приоритетом 7 — в состояние готовности (READY).
Что называется блокированным состоянием? Проблема здесь состоит в том, что блокированных состояний существует несколько. Реально в QNX/Neutrino имеется более дюжины блокированных состояний.
Почему так много? Потому что ядро отслеживает причину, по которой поток заблокирован.
Мы уже ознакомились с двумя типами блокирующих состояний: когда поток заблокирован в ожидании мутекса, этот поток находится в состоянии блокировки по мутексу (MUTEX). Когда поток заблокирован, ожидая семафор, он находится в состоянии блокировки по семафору (SEM). Эти состояния просто указывают, в очереди на какой ресурс поток заблокирован.
Если по мутексу заблокировано несколько потоков, ядро не уделит им никакого внимания до тех пор, пока поток, который владеет мутексом, не освободит его. Как только это произойдет, один из блокированных потоков будет переведен в состояние готовности (READY), и ядро при необходимости примет решение о перепланировании.
Почему «при необходимости»? У потока, который только что освободил мутекс, вполне могут быть и другие дела, и он может иметь более высокий приоритет, чем все остальные ожидающие процессор потоки. В этом случае мы следуем второму правилу, которое гласит: «всегда должен выполняться поток с наивысшим приоритетом», что означает, что порядок диспетчеризации не изменяется — поток с наивысшим приоритетом продолжает работать.
Ниже представлен полный список блокированных состояний с краткими пояснениями. Этот список, кстати, есть в заголовочном файле
, только там эти состояния снабжены префиксом «STATE_» (например, состояние READY данной таблицы там будет звучать как STATE_READY).
Если состояние потока: | To это значит, что: |
---|---|
DEAD | Поток «мертв», ядро ожидает освобождения занятых им ресурсов. (В классических UNIX системах это состояние также называют «zombie» — «зомби» — прим. ред.) |
RUNNING | Поток выполняется. |
READY | Поток не выполняется, но готов к работе (работает один или более потоков с более высокими или равными приоритетами). |
STOPPED | Поток приостановлен (по сигналу SIGSTOP |
SEND | Поток ожидает приема своего сообщения сервером. |
RECEIVE | Поток ожидает сообщение от клиента. |
REPLY | Поток ожидает от сервера ответ на свое сообщение. |
STACK | Поток ожидает распределения дополнительного стекового пространства. |
WAITPAGE | Поток ожидает устранения администратором процессов повреждения на странице. |
SIGSUSPEND | Поток ожидает сигнал. |
SIGWAITINFO | Поток ожидает сигнал. |
NANOSLEEP | Поток «спит» (приостановлен на определенный период времени). |
MUTEX | Поток ожидает захват мутекса. |
CONDVAR | Поток ожидает соблюдения условия условной переменной. |
JOIN | Поток ожидает завершения другого потока. |
INTR | Поток ожидает прерывание. |
SEM | Поток ожидает захват семафора. |
Важно помнить о том, что когда поток блокирован, независимо от состояния блокировки, он не потребляет ресурсы процессора. Наоборот, единственным состоянием, в котором поток потребляет ресурсы процессора, является состояние выполнения (RUNNING).
Мы рассмотрим блокированные состояния SEND (блокировка по передаче), RECEIVE (блокировка по приему) и REPLY (блокировка по ответу) в главе «Обмен сообщениями». Состояние NANOSLEEP связано с применением функций типа sleep(), которые мы рассмотрим в главе «Часы, таймеры и периодические уведомления». Состояние INTR связано с использованием функции InterruptWait(), которую мы изучим в главе «Прерывания». Большинство всех прочих состояний обсуждается в данной главе.
Вернемся к нашим рассуждениям о потоках и процессах, но на сей раз с точки зрения перспективы их применения в системах реального времени. Затем мы рассмотрим вызовы функций, которые применяются при работе с потоками и процессами.
Мы знаем, что процесс может содержать один или больше потоков. (Процесс с нулевым числом потоков не был бы способен что-либо делать: если в доме никого нет, выполнять какую-либо полезную работу просто некому.) В операционной системе QNX/Neutrino допускается один или более процессов. (Аналогично — QNX/Neutrino с нулевым количеством процессов просто не сможет ничего сделать.)
Что же делают все эти процессы и потоки? В конечном счете, они формируют систему — собрание потоков и процессов, реализующих определенную цель.
На самом высоком уровне абстракции система состоит из множества процессов. Каждый процесс ответственен за обеспечение служебных функций определенного характера, независимо от того, является ли он элементом файловой системы, драйвером дисплея, модулем сбора данных, модулем управления или чем-либо еще.
В пределах каждого процесса может быть множество потоков. Число потоков варьируется. Один разработчик ПО, используя только единственный поток, может реализовать те же самые функциональные возможности, что и другой, использующий пять потоков. Некоторые задачи сами по себе приводят к многопоточности и дают относительно простые решения, другие в силу своей природы, являются однопоточными, и свести их к многопоточной реализации достаточно трудно.
Проблемы разработки ПО с применением потоков могли легко стать темой отдельной книги. Здесь же мы изложим только основы этой проблемы.
Почему же не взять просто один процесс с множеством потоков? В то время как некоторые операционные системы вынуждают вас программировать только в таком варианте, возникает ряд преимуществ при разделении объектов на множество процессов.
К таким преимуществам относятся:
• возможность декомпозиции задачи и модульной организации решения;
• удобство сопровождения;
• надежность.
Концепция разделения задачи на части, т.е., на несколько независимых задач, является очень мощной. И именно такая концепция лежит в основе QNX/Neutrino. Операционная система QNX/Neutrino состоит из множества независимых модулей, каждый из которых наделен некоторой зоной ответственности. Эти модули независимы и реализованы в отдельных процессах. Разработчики из QSSL использовали эту удобную особенность для отдельной разработки модулей, независимых друг от друга. Единственная возможная установить зависимость этих модулей друг от друга — наладить между ними информационную связь с помощью небольшого количества строго определенных интерфейсов.
Это естественно ведет к упрощению сопровождения программных продуктов, благодаря незначительному числу взаимосвязей. Поскольку каждый модуль четко определен, и устранять неисправности в одном таком модуле будет гораздо проще — тем более, что он не связан с другими.
Тем не менее, наиболее важным моментом является надежность. Процесс, точно так же, как и жилой дом, имеет некоторые четкие «границы». Человек, живущий в доме, точно знает, когда он в доме, а когда — нет. Поток наделен в этом смысле пониманием, что если у него есть доступ к памяти в пределах процесса, он может функционировать. Если он переступит границы адресного пространства процесса, он будет уничтожен. Это означает, что два потока, работающие в различных процессах, изолированы один от другого.
Защита памяти.
Адресные пространства процессов устанавливаются и поддерживаются модулем администратора процессов QNX/ Neutrino. При запуске процесса администратор процессов распределяет ему некоторый объем памяти и активирует его потоки. Отведенная данному процессу память помечается как принадлежащая ему.
Это означает, что если в данном процессе имеются есть несколько потоков, и ядру необходимо переключить контекст между ними, это можно сделать очень эффективно, поскольку не нужно изменять адресное пространство, достаточно просто сменить рабочий поток. Если, однако, мы должны переключиться на другой поток в другом процессе, тут уже включается в работу администратор процессов и переключает адресное пространство. Пусть вас не беспокоят возникающие при этом дополнительные издержки — под управлением QNX/Neutrino все это осуществляется очень быстро.
Теперь обратим внимание на функции, предназначенные для работы с потоками и процессами. Любой поток может осуществить запуск процесса; единственные налагаемые здесь ограничения вытекают из основных принципов защиты (правила доступа к файлу, ограничения на привилегии и т.д.). По всей вероятности, вам уже доводилось запускать процессы — либо из системного сценария, либо из командного интерпретатора, или из программы от своего имени.
Например, при запуске процесса из командного интерпретатора вы можете ввести командную строку:
$ program1
Это предписывает командному интерпретатору запустить программу
program1
и ждать завершения ее работы. Или, вы могли набрать:
$ program2 &
Это предписывает командному интерпретатору запустить программу
program2
без ожидания ее завершения. В таком случае говорят, что программа program2
работает в фоновом режиме.
Если вы пожелаете скорректировать приоритет программы до ее запуска, вы можете применить команду
nice
— точно так же, как в UNIX:
$ nice program3
Это предписывает командному интерпретатору запустить программу
program3
с заниженным приоритетом.
Или нет?
Если посмотреть, что происходит в действительности, то мы велели командному интерпретатору выполнить программу, называемую
nice
, с обычным приоритетом. Команда nice
затем занизила свой собственный приоритет (отсюда и имя программы «nice» — «благовоспитанная») и затем запустила программу program3
с этим заниженным приоритетом.
Нас обычно не заботит тот факт, что командный интерпретатор создает процессы — это просто подразумевается. В некоторых прикладных задачах можно положиться на сценарии командного интерпретатора (пакеты команд, записанные в файл), которые сделают эту работу за вас, но в ряде других случаев вы пожелаете создавать процессы самостоятельно.
Например, в большой мультипроцессорной системе вы можете пожелать, чтобы одна главная программа выполнила запуск всех других процессов вашего приложения на основании некоторого конфигурационного файла. Другим примером может служить необходимость запуска процессов по некоторому событию.
Рассмотрим некоторые из функций, которые QNX/Neutrino обеспечивает для запуска других процессов (или подмены одного процесса другим):
• system();
• семейство функций exec();
• семейство функций spawn();
• fork();
• vfork().
Какую из этих функций применять, зависит от двух требований: переносимости и функциональности. Как обычно, между этими двумя требованиями возможен компромисс.
Обычно при всех запросах на создание нового процесса происходит следующее. Поток в первоначальном процессе вызывает одну из вышеприведенных функций. В конечном итоге, функция заставит администратор процессов создать адресное пространство для нового процесса. Затем ядро выполнит запуск потока в новом процессе. Этот поток выполнит несколько инструкций и вызовет функцию main(). (Конечно, в случае вызова функции fork() или vfork() новый поток начнет выполнение в новом процессе с возврата из fork() или vfork(), соответственно — как иметь с этим дело, мы вкратце рассмотрим ниже).
Функция system() — самая простая функция; она получает на вход одну командную строку, такую же, которую вы набрали бы в ответ на подсказку командного интерпретатора, и выполняет ее.
Фактически, для обработки команды, которую вы желаете выполнить, функция system() запускает копию командного интерпретатора.
Редактор, который я использую при написании данной книги, использует вызов system(). При редактировании мне может понадобиться выйти в командный интерпретатор, проверить некоторые фрагменты программ, и затем снова вернуться в редактор. Все это необходимо сделать, не потеряв позицию курсора в тексте. В этом редакторе я, к примеру, могу дать команду «:!pwd» для отображения текущего рабочего каталога. Редактор при этом выполнит следующий код:
system("pwd");
Подходит ли функция system() для всех дел в Поднебесной? Конечно же, нет. Однако, ее применение может быть очень полезно для множества задач, требующих создания процессов.
Давайте рассмотрим ряд других функций создания процессов.
Следующие функции создания процессов, которые следует рассмотреть, принадлежат к семействам exec() и spawn(). Прежде, чем мы обратимся к подробностям их применения, рассмотрим суть различий между этими двумя группами функций.
Семейство функций exec() подменяет текущий процесс другим. Я подразумеваю под этим то, что когда процесс вызывает функцию семейства exec(), этот процесс прекращает выполнение текущей программы и начинает выполнять другую. Идентификатор процесса (PID) при этом не меняется, просто процесс преобразуется в другую программу. Что произойдет с потоками в данном процессе? Мы вернемся к этой теме после того, как рассмотрим функцию fork().
С другой стороны, семейство функций spawn() так не делает. Вызов функции семейства spawn() создает другой процесс (с новым идентификатором), который соответствует программе, указанной в аргументах функции.
Spawn | POSIX | Exec | POSIX |
---|---|---|---|
spawn() | Да | ||
spawnl() | Нет | execl() | Да |
spawnle() | Нет | execle() | Да |
spawnlp() | Нет | execlp() | Да |
spawnlpe() | Нет | execlpe() | Нет |
spawnp() | Да | ||
spawnv() | Нет | execv() | Да |
spawnve() | Нет | execve() | Да |
spawnvp() | Нет | execvp() | Да |
spawnvpe() | Нет | execvpe() | Нет |
Рассмотрим различные варианты функций exec() и spawn(). В таблице, представленной ниже, вы увидите, что некоторые функции из них предусмотрены POSIX, а некоторые — нет. Конечно, для максимальной переносимости, следует использовать только POSIX-совместимые функции.
При том, что названия функций могут показаться малопонятными, в их суффиксах есть логика.
Суффикс: | Смысл: |
l (нижний регистр «L») | Список аргументов определяется через список параметров, заданный непосредственно в самом вызове и завершаемый нулевым аргументом NULL. |
е | Указывается окружение. |
p | Если не указано полное имя пути программы, для ее поиска используется переменная окружения PATH. |
v | Список аргументов определяется через указатель на вектор (массив) аргументов. |
Список аргументов здесь — список аргументов командной строки, передаваемых программе.
Заметьте, что в библиотеке языка Си функции spawnlp(), spawnvp() и spawnlpe() все вызывают функцию spawnvpe(), которая, в свою очередь, вызывает POSIX-функцию spawnp(). Функции spawnle(), spawnv() и spawnl() все в конечном счете вызывают функцию spawnve(), которая затем вызывает POSIX-функцию spawn(). И, наконец, POSIX-функция spawnp() вызывает POSIX-функцию spawn(). Таким образом, в основе всех возможностей семейства spawn() лежит сам вызов spawn().
Рассмотрим теперь различные варианты функций spawn() и exec() более подробно так, чтобы вы смогли получить навык свободного использования различных суффиксов. Затем мы перейдем непосредственно к рассмотрению вызова функции spawn().
Например, если я хочу вызвать команду
ls
с аргументами -t
, -r
, и -l
(означает — «сортировать выходные данные по времени в обратном порядке и показывать выходные данные в длинном формате»), я мог бы определить это в программе так:
/* Вызвать ls и продолжить выполнение */
spawnl(P_WAIT, "/bin/ls", "/bin/ls", "-t", "-r", "-l",
NULL);
/* Заменить себя на ls */
execl(P_WAIT, "/bin/ls", "/bin/ls", "-t", "-r", "-l",
NULL);
Или, вариант с применением суффикса v:
char *argv[] = {
"/bin/ls",
"-t",
"-r",
"-l",
NULL
};
/* Вызвать ls и продолжить выполнение */
spawnv(P_WAIT, "/bin/ls", argv);
/* Заменить себя на ls */
execv(P_WAIT, "/bin/ls", "/bin/ls", argv);
Почему именно такой выбор? Он дан для удобства восприятия. У вас может быть синтаксический анализатор, уже встроенный в вашу программу, и может быть удобно сразу оперировать массивами строк. В этом случае я бы рекомендовал применять варианты с суффиксом «v». Или вам может понадобиться запрограммировать вызов программы, когда вам известно, где он находится и какие имеет параметры. В этом случае, зачем вам утруждать себя созданием массива строк, когда вы знаете точно, какие нужны аргументы? Просто передайте их варианту функции с суффиксом «l».
Отметим, что мы передаем реальное имя пути программы (/bin/ls), а затем имя программы еще раз в качестве первого аргумента. Это делается для поддержки программ, которые ведут себя по-разному в зависимости от того, под каким именем они были вызваны.
Например, GNU-утилиты компрессии и декомпрессии (gzip и gunzip) фактически привязаны к одному и тому же исполняемому модулю. Когда исполняемый модуль стартует, он анализирует аргумент argv[0] (передаваемый функции main()) и принимает решение, следует ли выполнять компрессию или декомпрессию.
Варианты с суффиксом «е» передают программе окружение. Окружение — это только своего рода «контекст», в котором работает программа. Например, у вас может быть программа проверки орфографии, у которой есть эталонный словарь. Вместо описания каждый раз в командной строке местоположения словаря вы могли бы сделать это в окружении:
$ export DICTIONARY=/home/rk/.dict
$ spellcheck document.1
Команда
export
предписывает командному интерпретатору создать новую переменную окружения (в нашем случае DICTIONARY
) и присвоить ей значение (/home/rk/.dict
).
Если вы когда-либо хотели бы использовать различные словари, вы были бы должны изменить среду до выполнения программы. Это просто сделать из оболочки:
$ export DICTIONARY=/home/rk1.altdict
$ spellcheck document.1
Но как сделать это из ваших собственных программ? Для того чтобы применять «e»-версии функций spawn() и exec(), вам следует определить массив строк, представляющих собой окружение:
char *env[] = {
"DICTIONARY=/home/rk/.altdict",
NULL
};
// Запуск проверки в отдельном процессе:
spawnle(P_WAIT, "/usr/bin/spellcheck",
"/usr/bin/spellcheck", "documents.1", NULL, env);
// Запуск проверки вместо себя:
execle("/usr/bin/spellcheck", "/usr/bin/spellcheck",
"document.1", NULL, env);
Версии с суффиксом «p» будут искать исполняемый модуль программы в списке каталогов, приведенном в переменной окружения PATH. Вы, вероятно, отметили, что во всех примерах местоположение исполняемых модулей строго определено —
/bin/ls
и /usr/bin/spellcheck
. А как быть с другими исполняемыми модулями? Если вы не хотите сразу определить точный путь к нужной программе, было бы лучше сделать так, чтобы места поиска исполняемых модулей вашей программе сообщил пользователь. Стандартная системная переменная PATH для этого и предназначена. Ниже приведено ее значение для минимальной системы:
PATH=/proc/boot:/bin
Это сообщает командному интерпретатору, что когда я набираю команду, он в первую очередь должен просмотреть каталог
/proc/boot/
, и если не сможет найти команду там, то должна просмотреть каталог бинарных файлов /bin
. Переменная PATH представляет собой разделяемый двоеточиями список каталогов для поиска команд. К переменной PATH вы можете добавлять столько элементов, сколько пожелаете, но имейте в виду, что при поиске файла будут проанализированы все элементы (в приведенной последовательности).
Если вы не знаете путь к выполнимой программе, вы можете использовать варианты с суффиксом «p».
Например:
// Использование явного пути:
execl("/bin/ls", "/bin/ls", "-l", "-t", "-r", NULL);
// Поиск пути в PATH:
execp("ls", "ls", "-l", "-t", "-r", NULL) ;
Если функция execl() не сможет найти
ls
в /bin
, она завершится с ошибкой. Функция execlp() просмотрит все каталоги, указанные в PATH, в поисках ls
, и завершится с ошибкой только в том случае, если не сможет найти ls ни в одном из этих каталогов. Это также прекрасная вещь для многоплатформенной поддержки — вашей программе не обязательно знать имена каталогов, принятых на разных машинах, она просто выполнит поиск.
А что произойдет, если сделать так?
execlp("/bin/ls", "ls", "-l", "-t", "-r", NULL);
Выполняет ли этот вызов поиск в окружении? Нет. Вы явно указали execlp() имя пути, что отменяет правило поиска в PATH. Если
ls
не будет найдена в /bin
(а здесь это будет именно так), то никаких других попыток поиска не выполняется — эта ситуация подобна варианту с использованием функции execl()).
Опасно ли смешивать явный путь с простым именем команды (например, указывать путь как
/bin/ls
, а имя — как ls
вместо /bin/ls
)? Обычно нет, потому что:
• значительное число программ так или иначе игнорирует
argv[0]
;
• те программы, поведение которых зависит от их имени, обычно вызывают функцию basename(), которая удаляет каталоговую часть
argv[0]
и возвращает только имя.
Единственная обоснованная причина использования полного имени пути в качестве первого параметра заключается в том, что программа может выводить диагностические сообщения, содержащие этот первый параметр, который немедленно укажет вам, откуда она была вызвана. Это может быть важно, если копии программы располагаются в нескольких каталогах из перечисленных в PATH.
Функции семейства spawn() имеют дополнительный параметр; во всех приведенных выше примерах я всегда указывал P_WAIT. Имеются четыре флага, которые вы можете придать функции spawn(), чтобы изменить ее поведение:
P_WAIT | Вызывающий процесс (ваша программа) будет блокирован до тех пор, пока вновь созданный процесс не отработает и не завершится. |
P_NOWAIT | Вызывающая программа не будет блокирована на время выполнения вновь созданной. Это позволяет вам запустить программу в фоновом режиме и продолжать выполнение, пока она делает свое дело. |
P_NOWAITO | Аналогично P_NOWAIT за исключением того, что устанавливается флаг SPAWN_NOZOMBIE. Это означает, что вы не должны беспокоить себя вызовом функции waitpid() для очистки кода завершения процесса. |
P_OVERLAY | Этот флаг превращает вызов функции spawn() в соответствующей вызов exec()! Ваша программа преобразуется в указанную программу без изменения идентификатора процесса ID. Вообще-то, если вы хотите сделать именно так, то, наверное, будет более корректно использовать вызов exec(), поскольку это избавит будущих читателей ваших исходных текстов от необходимости искать P_OVERLAY в справочном руководстве по библиотеке языка Си! |
Как мы упомянули выше, все функции семейства spawn(), в конечном счете, вызывают базовую функцию spawn(). Ниже приведен прототип функции spawn():
#include
pid_t spawn(const char *path, int fd_count,
const int fd_map[], const struct inheritance *inherit,
char* const argv[], char* const envp[]);
Мы можем не обращать внимание на параметры path, argv, и envp — мы уже рассмотрели их выше как местоположение исполняемого модуля (path), вектор параметров (argv) и окружение (envp).
Параметры fd_count и fd_map идут вместе. Если вы задаете нуль в fd_count, тогда fd_map игнорируется, и это означает, что вновь создаваемый процесс унаследует от родительского все дескрипторы файлов (кроме тех, которые модифицированы флагом FD_CLOEXEC функции fcntl()). Если параметр fd_count имеет ненулевое значение, то он задает число дескрипторов файлов, содержащихся в fd_map, и будут унаследованы только они.
Параметр inherit — это указатель на структуру, которая содержит набор флагов, маски сигналов, и т.д. Для получения более подробной информации об этом вам следует обратиться за помощью к справочному руководству по библиотеке языка Си.
Предположим, что вы решили создать новый процесс, который был бы идентичен работающему в настоящее время процессу, и сделать это так, чтобы эти два процесса выполнялись одновременно. Вы могли бы решить эту проблему с помощью функции spawn() (и параметра P_NOWAIT), передав вновь создаваемому процессу достаточно информации о точном состоянии вашего процесса, чтобы новый процесс мог настроить себя сам. Однако, такой подход может оказаться чрезвычайно сложным, потому что описание «текущего состояния» процесса может потребовать большого количества данных.
Существует более простой способ — применение функции fork() которая просто копирует текущий процесс. У результирующего процесса как код, так и данные полностью совпадают с таковыми для родительского процесса.
Конечно же, невозможно создать процесс, который во всем был бы идентичен родительскому. Почему? Наиболее очевидное различие между этими двумя процессами должно быть в идентификаторе процесса — мы не можем создать два процесса с одним и тем же идентификатором. Если вы посмотрите документацию на функцию fork() в справочном руководстве по библиотеке Си, вы увидите, что между этими двумя процессами будет иметь место ряд различий. Внимательно изучите этот список, чтоб быть уверенным в корректном применении функции fork().
Если после ветвления по fork() получаются два одинаковых процесса, то как же их различить? Когда вы вызываете fork(), вы тем самым создаете другой процесс, выполняющий тот же самый код и в том же самом местоположении (то есть оба процесса ввернутся из вызова fork()), что и родительский. Рассмотрим пример программы:
int main (int argc, char **argv) {
int retval;
printf("Это определенно родительский процесс\n");
fflush(stdout);
retval = fork();
printf("Кто это сказал?\n");
return (EXIT_SUCCESS);
}
После вызова fork() оба процесса выполнят второй вызов printf()! Если вы запустите эту программу на выполнение, она выведет на экран примерно следующее:
Это определенно родительский процесс
Кто это сказал?
Кто это сказал?
Иными словами, оба процесса выведут вторую строку.
Существует только один способ различить эти два процесса — он заключается в использовании возвращаемого функцией fork() значения, размещенного в retval. Во вновь созданном дочернем процессе retval будет иметь нулевое значение, а в родительском она будет содержать идентификатор дочернего.
Китайская грамота, да? Проясним этот момент еще одним фрагментом программы:
printf("PID родителя равен %d\n", getpid());
fflush(stdout);
if (child_pid = fork()) {
printf("Это родитель, PID сына %d\n", child_pid);
} else {
printf("Это сын, PID %d\n", getpid());
}
Эта программа выведет на экран примерно следующее:
PID родителя равен 4496
Это родитель, PID сына 8197
Это сын, PID 8197
Таким образом, после применения функции fork() вы можете определить, в каком процессе находитесь («отец» это или «сын»), анализируя значение, возвращаемое функцией fork().
Применение функции vfork() по сравнению с обычной fork() позволяет существенно сэкономить на ресурсах, поскольку она делает разделяемым адресное пространство родителя.
Функция vfork() создает «сына», но затем приостанавливает родительский поток до тех пор, пока «сын» не вызовет функцию exec() или не завершится (с помощью exit() или его друзей). В дополнение к этому, функция vfork() будет работать в системах с физической моделью памяти, в то время как функция fork() не сможет, потому что нуждается в создании такого же адресного пространства, а это в физической модели памяти просто невозможно.
Предположим, что у вас есть процесс, и вы еще не создали никаких потоков (т.е., вы работаете с одним потоком — тем, который вызвал функцию main()). Если вызвать функцию fork(), то будет создан другой процесс, и тоже с одним потоком.
Это был простейший пример.
Теперь предположим, что в вашем процессе вы вызвали pthread_create() для создания другого потока. Если вы теперь вызовете функцию fork(), она возвратит ENOSYS (что означает, что функция не поддерживается)! Почему так?
Вы можете верить этому или нет, но это POSIX-совместимая ситуация. POSIX утверждает, что функция fork() может возвращать ENOSYS. На самом же деле происходит вот что: Си-библиотека QNX/Neutrino не рассчитана на ветвление процесса с потоками. Когда вы вызываете pthread_create(), эта функция устанавливает флаг, сигнализирующий что-то типа «не позволяйте этому процессу применять fork(), потому что механизм ветвления в данном случае не определен». Затем, при вызове fork(), этот флаг проверяется и, если он установлен, это принуждает fork() возвратить значение ENOSYS.
Такая реализация была сделана преднамеренно, и причина этого кроется в потоках и мутексах. Если бы этого ограничения не было (и оно может быть снято в будущих версиях QNX/ Neutrino), то вновь созданный процесс, как и предполагается, имел бы то же самое число потоков, что и исходный. Однако, тут возникает сложность, потому что некоторые из исходных потоков могут являться владельцами мутексов. Поскольку вновь создаваемый процесс имеет ту же область данных, что и исходный, библиотека должна была бы отслеживать, какие мутексы принадлежат каким потокам в исходном процессе, и затем дублировать принадлежность мутексов в новом процессе. Это не является невозможным: есть функция, называемая pthread_atfork(), которая обеспечивает процессу возможность обрабатывать такие ситуации. Однако, на момент написания этой книги функциональные возможности pthread_atfork() используются не всеми мутексами в Си-библиотеке QNX/Neutrino.
Очевидно, если вы переносите в QNX/Neutrino программу из другой ОС, вы пожелаете использовать те же механизмы, что и в исходной программе. Я бы посоветовал избегать в новом коде применения функции fork(), и вот почему:
• функция fork() не работает с несколькими потоками — см. выше;
• при работе с fork() в условиях многопоточности вы должны будете зарегистрировать обработчик pthread_atfork() и локировать каждый мутекс по отдельности перед собственно ветвлением, а это усложнит структуру программы;
• дочерние процессы, созданные fork(), копируют все открытые дескрипторы файлов. Как мы увидим позже в главе «Администратор ресурсов», это требует много дополнительных усилий, которые может быть совершенно напрасными, если дочерний процесс затем сразу сделает exec() и тем самым закроет все открытые дескрипторы.
Выбор между семействами функций vfork() и spawn() сводится к переносимости, а также того, что должны делать родительский и дочерний процесс. Функция vfork() задержит выполнение до тех пор, пока дочерний процесс не вызовет exec() или не завершится, тогда как семейство spawn() может позволить работать обоим процессам одновременно. Впрочем, в разных ОС поведение функции vfork() может несколько отличаться.
Теперь, когда мы знаем, как запустить другой процесс, давайте рассмотрим, как осуществить запуск другого потока.
Любой поток может создать другой поток в том же самом процессе; на это не налагается никаких ограничений (за исключением объема памяти, конечно!) Наиболее общий путь реализации этого — использование вызова функций POSIX pthread_create():
#include
int pthread_create(pthread_t *thread,
const pthread_attr_t *attr,
void*(*start_routine)(void*), void *arg);
Функция pthread_create() имеет четыре аргумента :
thread | указатель на , где хранится идентификатор потока |
attr | атрибутная запись |
start_routine | подпрограмма, с которой начинается поток |
arg | параметр, который передается подпрограмме start_routine |
Отметим, что указатель thread и атрибутная запись (attr) — необязательные элементы, вы может передавать вместо них NULL.
Параметр thread может использоваться для хранения идентификатора вновь создаваемого потока. Обратите внимание, что в примерах, приведенных ниже, мы передадим NULL, обозначив этим, что мы не заботимся о том, какой идентификатор будет иметь вновь создаваемый поток.
Если бы нам было до этого дело, мы бы сделали так:
pthread_t tid;
pthread_create(&tid, ...
printf("Новый поток имеет идентификатор %d\n", tid);
Такое применение совершенно типично, потому что вам часто может потребоваться знать, какой поток выполняет какой участок кода.
Небольшой тонкий момент. Новый поток может начать работать еще до присвоения значения параметру tid. Это означает, что вы должны внимательно относиться к использованию tid в качестве глобальной переменной. В примере, приведенном выше, все будет корректно, потому что вызов pthread_create() отработал до использования tid, что означает, что на момент использования tid имел корректное значение.
Новый поток начинает выполнение с функции start_routine(), с параметром arg.
Когда вы осуществляете запуск нового потока, он может следовать ряду четко определенных установок по умолчанию, или же вы можете явно задать его характеристики.
Прежде, чем мы перейдем к обсуждению задания атрибутов потока, рассмотрим тип данных
pthread_attr_t
:
typedef struct {
int flags;
size_t stacksize;
void *stackaddr;
void (*exitfunc)(void *status);
int policy;
struct sched_param param;
unsigned guardsize;
} pthread_attr_t;
В основном эти поля используются как:
flags | Неисчисляемые (булевы) характеристики потока — например, создается поток как «обособленный» или «синхронизирующий». |
stacksize, stackaddr и guardsize | Параметры стека. |
exitfunc | Функция, выполняемая перед завершением потока. |
policy и param | Параметры диспетчеризации. |
Доступны следующие функции:
Управление атрибутами
pthread_attr_destroy()
pthread_attr_init()
Флаги (булевы характеристики)
pthread_attr_getdetachstate()
pthread_attr_setdetachstate()
pthread_attr_getinheritsched()
pthread_attr_setinheritsched()
pthread_attr_getscope()
pthread_attr_setscope()
Параметры стека
pthread_attr_getguardsize()
pthread_attr_setguardsize()
pthread_attr_getstackaddr()
pthread_attr_setstackaddr()
pthread_attr_getstacksize()
pthread_attr_setstacksize()
Параметры диспетчеризации
pthread_attr_getschedparam()
pthread_attr_setschedparam()
pthread_attr_getschedpolicy()
pthread_attr_setschedpolicy()
Список выглядит довольно большим (18 функций), но в действительности нас будет заботить применение только примерно половины функций из этого списка, потому что все эти они сгруппированы по парам «get» — «set», т.е. в каждой паре есть функция как получения параметров (get), так и их установки (set) — за исключением функций pthread_attr_init() и pthread_attr_destroy().
Прежде чем мы исследуем назначения атрибутов, следует отметить одно обстоятельство. Вы обязаны вызвать pthread_attr_init() для инициализации атрибутной записи до момента ее использования, задействовать ее с помощью соответствующей функции (функций) pthread_attr_set*() и только затем вызвать функцию pthread_create() для создания потока. Изменение атрибутной записи после того, как поток уже создан, не будет иметь никакого действия.
Перед использованием атрибутной записи для ее инициализации следует вызвать функцию pthread_attr_init():
...
pthread_attr_t attr;
...
pthread_attr_init(&attr);
Вы можете также вызывать pthread_attr_destroy() для «деинициализации» атрибутной записи потока, но так обычно никто не делает (если не требуется жесткой POSIX-совместимости).
В приведенных ниже описаниях значения по умолчанию помечены комментарием «(по умолчанию)».
Три функции — pthread_attr_setdetachstate(), pthread_attr_setinheritsched() и pthread_attr_setscope() — определяют, создается ли поток как «синхронизирующий» («joinable») или как «обособленный» (detached), наследует ли поток атрибуты диспетчеризации от создающего потока или использует атрибуты диспетчеризации, указанные в функциях pthread_attr_setschedparam() и pthread_attr_setschedpolicy(), и, наконец, имеет ли поток масштаб «системы» или «процесса».
Для создания «синхронизирующего» потока (это значит, что с завершением этого потока можно синхронизировать другой поток при помощи функции pthread_join()), используется вызов:
(по умолчанию)
pthread_attr_setdetachstate(&attr,
PTHREAD_CREATE_JOINABLE);
Чтобы создать поток, синхронизация с завершением которого невозможна (такой поток называют «обособленным»), надо было бы сделать так:
pthread_attr_setdetachstate(&attr,
PTHREAD_CREATE_DETACHED);
Если вы желаете, чтобы поток унаследовал атрибуты диспетчеризации от потока, его создающего (то есть имел бы ту же самую дисциплину диспетчеризации и тот же самый приоритет, что и родитель), вам следует сделать так:
(по умолчанию)
pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
Для создания потока, который использует атрибуты диспетчеризации, указанные в непосредственно в атрибутной записи (это делается при помощи функций pthread_attr_setsetschedparam() и pthread_attr_setschedpolicy()), вызов выглядел бы следующим образом:
pthread_attr_setinheritsched(&attr,
PTHREAD_EXPLICIT_SCHED);
И наконец, функция pthread_attr_setscope(). Вам не придется ее вызывать никогда. Почему? Потому что QNX/Neutrino поддерживает для потоков только масштаб системы, и соответствующее значение устанавливается по умолчанию, когда вы инициализируете атрибут. (Масштаб системы означает, что за обладание ресурсами все потоки в системе конкурируют друг с другом; масштаб процесса же означает, что потоки конкурируют за процессор только в пределах «своего» процесса, а диспетчеризацию процессов выполняет ядро).
Если вам необходимо вызвать эту функцию, вы можете сделать это только следующим образом:
(по умолчанию)
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
Прототипы функций установки параметров стека в атрибутах потока выглядят следующим образом:
int pthread_attr_setguardsize(pthread_attr_t *attr,
size_t gsize);
int pthread_attr_setstackaddr(pthread_attr_t *attr,
void *addr);
int pthread_attr_setstacksize(pthread_attr_t *attr,
size_t ssize);
Все эти три функции имеют в качестве первого параметра атрибутную запись, вторые параметры перечислены ниже:
gsize | Размер «области защиты». |
addr | Адрес стека, если последний вами предусмотрен. |
ssize | Размер стека. |
Область защиты — это область памяти, расположенная сразу после стека, которую поток не может использовать для записи. Если это происходит (а это означает, что стек вот-вот переполнится), потоку будет послан SIGSEGV. Если размер области защиты равен 0, это означает, что области защиты не предусматривается. Это также подразумевает, что проверка стека на переполнение выполняться не будет. Если размер области защиты отличен от нуля, то это устанавливает его по меньшей мере в общесистемное значение по умолчанию (которое вы можете получить по запросу sysconf(), указав ему константу _SC_PAGESIZE). Заметьте, что ненулевой минимально возможный размер области защиты составляет одну страницу (например, 4 Кб для процессора x86). Также отметьте, что страница защиты не занимает никакой физической памяти, это уловка с применением виртуальной адресации (MMU). Параметр addr представляет собой адрес стека, если вы его задаете явно. Вы можете задать вместо него NULL, что будет значить, что система будет должна сама распределить (и освободить!) стек для потока. Преимущество явного определения стека для потока состоит в том, что вы сможете делать «посмертный» (после аварийного завершения) анализ глубины стека. Это достигается распределением области стека и заполнением ее некоторой «подписью» (например, многократно повторяемой строкой «STACK»), после чего поток запускается на выполнение. По завершении работы потока вы сможете проанализировать область стека и посмотреть, на какую глубину поток затер в ней вашу «подпись», и тем самым определить максимальную глубину стека, использованную потоком в данном конкретном сеансе выполнения.
Параметр ssize определяет размер стека. Если вы явно задаете адрес области стека в параметре addr, то параметр ssize должен задавать размер этой области. Если вы не задаете адрес области стека в параметре addr (то есть передаете вместо адреса NULL), то параметр ssize сообщает системе, стек какого размера следует распределить. Если вы укажете для параметра ssize значение 0 (ноль), система выберет размер стека, заданный по умолчанию. Очевидно, что задавать 0 в качестве параметра ssize и при этом явно указывать адрес стека, используя параметр addr — порочная практика, поскольку в действительности вы тем самым заявляете: «вот указатель на объект, который имеет некоторый заданный по умолчанию размер». Проблема здесь заключается в том, что между размером объекта и передаваемым значением нет никакой связи.
Если стек назначается с помощью параметра attr, данный поток не будет защищен от переполнения этого стека (то есть область защиты будет отсутствовать).
Наконец, если вы определяете PTHREAD_EXPLICIT_SCHED для функции pthread_attr_setinheritsched(), тогда вам необходимо будет как-то определить дисциплину диспетчеризации и приоритет для потока, который вы намерены создать.
Это выполняется с помощью двух функций:
int pthread_attr_setschedparam(pthread_attr_t *attr,
const struct sched_param *param);
int pthread_attr_setschedpolicy(pthread_attr_t *attr,
int policy);
С параметром policy все просто — это либо SCHED_FIFO, либо SCHED_RR, либо SCHED_OTHER.
В рассматриваемой версии QNX/Neutrino параметр SCHED_OTHER интерпретируется как SCHED_RR (карусельная диспетчеризация).
Параметр param — структура, которая содержит единственный элемент: sched_priority. Задайте этот параметр путем прямого присвоения ему значения желаемого приоритета.
Стандартная ошибка, которой следует избегать, заключается в задании PTHREAD_EXPLICIT_SCHED и затем определением только дисциплины диспетчеризации. Проблема состоит в том, что в инициализированной атрибутной записи значение param.sched_priority есть 0 (ноль). Это тот же самый приоритет, что и у «холостого» потока (IDLE), что означает, что создаваемый вами поток будет конкурировать за процессор с «холостым» потоком.
Плавали, знаем. :-)
На том, что QSSL зарезервировала нулевой приоритет только для «холостого» потока, уже «прокололось» немало программистов. Поток с нулевым приоритетом просто не сможет выполняться.
Давайте рассмотрим ряд примеров. Будем считать, что в обсуждаемой программе подключены нужные заголовочные файлы (
и
), а также что поток, который предстоит создать, называется new_thread(), и для него существуют все необходимые прототипы и определения.
Самый обычный способ создания потока — просто оставить везде значения по умолчанию:
pthread_create(NULL, NULL, new_thread, NULL);
В вышеупомянутом примере мы создали наш новый поток со значениями параметров по умолчанию и передали ему NULL в качестве его единственного параметра (третий NULL в указанном выше вызове pthread_create()).
Вообще говоря, вы можете передавать вашему новому потоку что угодно через параметр arg. Например, число 123:
pthread_create(NULL, NULL, new_thread, (void*)123);
Более сложный пример — создание «обособленного» (detached) потока с диспетчеризацией карусельного типа (RR) и приоритетом 15:
pthread_attr_t attr;
// Инициализировать атрибутную запись
pthread_attr_init(&attr);
// Установить «обособленность»
pthread_attr_setdetachstate(&attr,
PTHREAD_CREATE_DETACHED);
// Отменить наследование по умолчанию (INHERIT_SCHED)
pthread_attr_setinheritsched(&attr,
PTHREAD_EXPLICIT_SCHED);
pthread_attr_setschedpolicy(&attr, SCHED_RR);
attr.param.sched_priority = 15;
// И, наконец, создать поток
pthread_create(NULL, &attr, new_thread, NULL);
Для того чтобы увидеть, как «выглядит» многопоточная программа, можно запустить из командного интерпретатора команду
pidin
. Скажем, нашу программу зовут spud
. Если мы выполняем pidin
один раз до создания программой spud потоков, и еще раз — после того, как spud
создала два потока (тогда всего их будет три), то вот как примерно будет выглядеть вывод (я укоротил вывод pidin
для демонстрации только того, что относится к spud
):
# pidin
pid tid name prio STATE Blocked
12301 1 spud 10r READY
# pidin
pid tid name prio STATE Blocked
12301 1 spud 10r READY
12301 2 spud 10r READY
12301 3 spud 10r READY
Вы можете видеть, что процесс
spud
(идентификатор процесса 12301) имеет три потока (столбец «tid» в таблице). Эти три поток» выполняются с приоритетом 10, с диспетчеризацией карусельного (RR) типа (обозначенной как «r» после цифры 10). Все три процесса находятся в состоянии готовности (READY), т. е. готовы использовать процессор, но в настоящее время не выполняются (поскольку в данный момент выполняется другой поток с более высоким приоритетом).
Теперь, когда мы знаем все о создании потоков, давайте рассмотрим, как и где мы можем этим воспользоваться.
Существует два класса задач, где можно было бы эффективно применять многопоточность.
Потоки подобны перегруженным операторам в языке Си++. Поначалу может показаться хорошей идеей перегрузить каждый оператор какой-либо дополнительной интересной функцией, но это сделает программу трудной для восприятия. Аналогичная ситуация с потоками. Вы могли бы создать множество потоков, но это усложнит ваш код и сделает программу малопонятной, а значит, сложной в сопровождении. Разумное же применение потоков, наоборот, внесет в программу дополнительную функциональную ясность.
Применение потоков хорошо там, где можно выполнять операции параллельно — например, в ряде математических задач (графика, цифровая обработка сигналов, и т.д.). Потоки также прекрасны там, где программа должна выполнять несколько независимых функций, при этом использующих общие данные — например, веб-сервер, который обслуживает несколько клиентов одновременно. Эти два класса задач мы здесь и рассмотрим.
Предположим, что мы имеем графическую программу, выполняющую алгоритм трассировки луча. Каждая строка растра на экране зависит от содержимого основной базы данных (которая описывает генерируемую картинку). Ключевым моментом здесь является то, что каждая строка растра не зависит от остальных. Это обстоятельство (независимость строк растра) автоматически приводит к программированию данной задачи как многопоточной.
Ниже приведен однопоточный вариант:
int main (int argc, char **argv) {
int x1;
... // Выполнить инициализации
for (x1 = 0; x1 < num_x_lines; x1++) {
do_one_line(x1);
}
... // Вывести результат
}
Здесь мы видим, что программа итеративно по всем значениям рассчитает необходимые растровые строки.
В многопроцессорных системах эта программа будет использовать только один из процессоров. Почему? Потому что мы не указали операционной системе выполнять что-либо параллельно. Операционная система не настолько умна, чтобы посмотреть на программу и сказать: «Эй, секундочку! У нас ее 4 процессора, и похоже, что у нас тут несколько независимых потоков управления. Запущу-ка я это на всех 4 процессорах сразу!»
Так что это дело разработчика (ваше дело!) — сообщить QNX/Neutrino, какие разделы программы следует выполнять параллельно. Проще всего это можно было бы сделать так:
int main (int argc, char **argv) {
int x1;
... // Выполнить инициализации
for (x1 = 0; x1 < num_x_lines; x1++) {
pthread_create(NULL, NULL, do_one_line, (void*)x1);
}
... // Вывести результат
}
С таким упрощением связано множество проблем. Первая из них (и самая незначительная) состоит в том, что функцию do_one_line() придется модифицировать так, чтобы она могла в качестве своего аргумента принимать значение типа
void*
вместо int
. Это можно легко исправить с помощью оператора приведения типа (typecast).
Вторая проблема несколько сложнее. Скажем, что разрешающая способность дисплея, для которой вы рассчитывали картинку, была равна 1280×1024. Нам пришлось бы создать 1280 потоков! В общем-то, для QNX/Neutrino это не проблема — QNX/Neutrino позволяет создавать до 32767 потоков в одном процессе! Однако, каждый поток должен иметь свой уникальный стек. Если ваш стек имеет разумный размер (скажем 8 Кб), эта программа израсходует под стек 1280×8 Кб (10 мегабайт!) ОЗУ. И ради чего? В вашей системе есть только 4 процессора. Это означает, что только 4 из этих 1280 потоков будут работать одновременно, а другие 1276 потоков будут ожидать доступа к процессору. (В действительности, в данном случае пространство под стек будет выделяться только по мере необходимости. Но тем не менее, это все равно расходование ресурсов впустую — есть ведь еще и другие издержки.)
Более красивым способом решения этой задачи было бы разбить ее на 4 части (по одной подзадаче на каждый процессор), и обрабатывать каждую часть как отдельный поток:
int num_lines_per_cpu;
int num_cpus;
int main (int argc, char **argv) {
int cpu;
... // Выполнить инициализации
// Получить число процессоров
num_cpus = _syspage_ptr->num_cpu;
num_lines_per_cpu = num_x_lines / num_cpus;
for (cpu = 0; cpu < num_cpus; cpu++) {
pthread_create(NULL, NULL, do_one_batch, (void*)cpu);
}
... // Вывести результат
}
void* do_one_batch(void *c) {
int cpu = (int)c;
int x1;
for (x1 = 0; x1 < num_lines_per_cpu; x1++) {
do_line_line(x1 + cpu * num_lines_per_cpu);
}
}
Здесь мы запускаем только num_cpus потоков. Каждый поток будет выполняться на отдельном процессоре. А поскольку мы имеем дело с небольшим числом потоков, мы тем самым не засоряем память ненужными стеками. Обратите внимание, что мы получили число процессоров путем разыменования глобальной переменной — указателя на системную страницу _syspage_ptr. (Дополнительную информацию относительно системной страницы можно найти в книге «Building Embedded Systems» (поставляется в комплекте документации по QNX/ Neutrino — прим. ред.) или в заголовочном файле
).
Последняя программа в первую очередь интересна тем, что будет корректно функционировать в системе с одиночным процессором тоже. Просто будет создан только один поток, который и выполнит всю работу. Дополнительные издержки (один стек) с лихвой окупаются гибкостью программы, умеющей работать быстрее в многопроцессорной системе.
Я уже упоминал, что с приведенным выше упрощенным примером программы связана масса проблем. Так вот, еще одна связанная с ним проблема состоит в том, что функция main() сначала запускает целый букет потоков, а затем отображает результаты. Но как функция узнает, когда уже можно выводить результаты?
Заставлять main() заниматься опросом, закончены ли вычисления, противоречит самому замыслу ОС реального времени.
int main (int argc, char **argv) {
...
// Запустить потоки, как раньше
while (num_lines_completed < num_x_lines) {
sleep(1);
}
}
He вздумайте писать такие программы!
Для решения этой задачи существуют два изящных решения: применение функций pthread_join() и barrier_wait().
Самый простой метод синхронизации — это «присоединение» потоков. Реально это действие означает ожидание завершения.
Присоединение выполняется одним потоком, ждущим завершения другого потока. Ждущий поток вызывает pthread_join():
#include
int pthread_join(pthread_t thread, void **value_ptr);
Функции pthread_join() передается идентификатор потока, к которому вы желаете присоединиться, а также необязательный аргумент value_ptr, который может быть использован для сохранения возвращаемого присоединяемым потоком значения (Вы можете передать вместо этого параметра NULL, если это значение для вас не представляет интереса — в данном примере мы так и сделаем).
Где нам брать идентификатор потока? Мы игнорировали его в функции pthread_create(), передав NULL в качестве первого параметра. Давайте исправим нашу программу:
int num_lines_per_cpu;
int num_cpus;
int main(int argc, char **argv) {
int cpu;
pthread_t *thread_ids;
... // Выполнить инициализации
thread_ids = malloc(sizeof(pthread_t) * num_cpus);
num_lines_per_cpu = num_x_lines / num_cpus;
for (cpu = 0; cpu < num_cpus; cpu++) {
pthread_create(
&thread_ids[cpu], NULL, do_one_batch, (void*)cpu);
}
// Синхронизироваться с завершением всех потоков
for (cpu = 0; cpu < num_cpus; cpu++) {
pthread_join(thread_ids[cpu], NULL);
}
... // Вывести результат
}
Обратите внимание, что на этот раз мы передали функции pthread_create() в качестве первого аргумента указатель на
pthread_t
. Там и будет сохранен идентификатор вновь созданного потока. После того как первый цикл for
завершится, у нас будет num_cpu работающих потоков, плюс поток, выполняющий main(). Потребление ресурсов процессора потоком main() нас мало интересует — этот поток потратит все свое время на ожидание.
Ожидание достигается применением функции pthread_join() к каждому из наших потоков. Сначала мы ждем завершения потока
thread_ids[0]
. Когда он завершится, функция pthread_join() разблокируется. Следующая итерация цикла for
заставит нас ждать завершения потока thread_ids[1]
, и так далее для всех num_cpus потоков.
В этот момент возникает законный вопрос: «А что если потоки завершат работу в обратном порядке?» Другими словами, если имеются 4 процессора, и по какой-либо причине поток, выполняющийся на последнем процессоре (с номером 3), завершит работу первым, затем завершится поток, выполняющийся на процессоре с номером 2, и так далее? Вся прелесть приведенной схемы заключается в том, что ничего плохого не произойдет.
Первое, что произойдет — это то, что pthread_join() блокируется на
thread_ids[0]
. Тем временем пусть завершится поток thread_ids[3]
. Это не окажет абсолютно никакого воздействия на поток main(), который будет по-прежнему ждать завершения первого потока. Затем, пусть завершит работу поток thread_ids[2]
. По-прежнему, никаких последствий. И так далее — пока не завершит работу поток thread_ids[0]
.
В этот момент pthread_join() разблокируется, и мы немедленно переходим к следующей итерации цикла
for
. Вторая итерация цикла for применит pthread_join() к потоку thread_ids[1]
, который не будет блокирован, и итерация завершится немедленно. Почему? Потому что поток, идентифицированный как thread_ids[1]
, уже завершился. Поэтому наш цикл for просто «проскочит» остальные потоки и завершится. В этот момент мы будем знать, что вычислительные потоки синхронизированы, и теперь мы можем выводить результаты отображение.
Когда мы говорили о синхронизации функции main() по моменту завершения рабочих потоков (в параграфе «Синхронизация по отношению к моменту завершения потока», см. выше), мы упомянули два метода синхронизации: один метод с применением функции pthread_join(), который мы только что рассмотрели, и метод с применением барьера.
Возвращаясь к нашей аналогии с процессами в жилом доме, предположим, что семья пожелала где-нибудь отдохнуть на природе. Водитель садится в микроавтобус и запускает двигатель. И ждет. Водитель будет ждать до тех пор, пока все члены семьи не сядут в машину, и только затем можно будет ехать — не можем же мы кого-нибудь оставить!
Точно так происходит и в нашем примере с выводом графики на дисплей. Основной поток должен дождаться того момента, когда все рабочие потоки завершат работу, и только затем можно начинать следующую часть программы.
Однако, отметьте для себя одну важную отличительную особенность. С применением функции pthread_join() мы ожидаем завершения потоков. Это означает, что на момент ее разблокирования потоков нет больше с нами; они закончили работу и завершились.
В случае с барьером, мы ждем «встречи» определенного числа потоков у барьера. Затем, когда заданное число потоков достигнуто, мы их всех разблокируем (заметьте, что потоки при этом продолжат выполнять свою работу).
Сначала барьер следует создать при помощи функции barrier_init():
#include
int barrier_init(barrier_t *barrier, const barrier_attr_t *attr, int count);
Эта функция создает объект типа «барьер» по переданному ей адресу (указатель на барьер хранится в параметре barrier) и назначает ему атрибуты, которые определены в attr (мы будем использовать NULL, чтобы установить значения по умолчанию). Число потоков, которые должны вызывать функцию barrier_wait(), передается в параметре count.
После того как барьер создан, каждый из потоков должен будет вызвать функцию barrier_wait(), чтобы сообщить, что он отработал:
#include
int barrier_wait(barrier_t *barrier);
После того как поток вызвал barrier_wait(), он будет блокирован до тех пор, пока число потоков, указанное первоначально в параметре count функции barrier_init(), не вызовет функцию barrier_wait() (они также будут блокированы). После того как нужное число потоков выполнит вызов функции barrier_wait(), все эти потоки будут разблокированы «одновременно».
Вот пример:
/*
* barrier1.c
*/
#include
#include
#include
#include
barrier_t barrier; // Объект типа «барьер»
void* thread1(void *not_used) {
time_t now;
char buf[27];
time(&now);
printf("Поток 1, время старта %s", ctime_r(&now, buf));
// Выполнить вычисления
// (вместо этого просто сделаем sleep)
sleep(20);
barrier_wait(&barrier);
// После этого момента все потоки уже завершатся
time(&now);
printf("Барьер в потоке 1, время срабатывания %s",
ctime_r(&now, buf));
}
void* thread2(void *not_used) {
time_t now;
char buf[27];
time(&now);
printf("Поток 2, время старта %s", ctime_r(&now, buf));
// Выполнить вычисления
// (вместо этого просто сделаем sleep)
sleep(40);
barrier_wait(&barrier);
// После этого момента все потоки уже завершатся
time(&now);
printf("Барьер в потоке 2, время срабатывания %s",
ctime_r(&now, buf));
}
main() // Игнорировать аргументы
{
time_t now;
char buf[27];
// Создать барьер со значением счетчика 3
barrier_init(&barrier, NULL, 3);
// Создать два потока, thread1 и thread2
pthread_create(NULL, NULL, thread1, NULL);
pthread_create(NULL, NULL, thread2, NULL);
// Сейчас выполняются оба потока
// Ждать завершения
time(&now);
printf("main(): ожидание у барьера, время %s",
ctime_r(&now, buf));
barrier_wait(&barrier);
// После этого момента все потоки уже завершатся
time(&now);
printf("Барьер в main(), время срабатывания %s",
ctime_r(&now, buf));
}
Основной поток создал объект типа «барьер» и инициализировал его значением счетчика, равным числу потоков (включая себя!), которые должны «встретиться» у барьера, прежде чем он «прорвется». В нашем примере этот индекс был равен 3 — один для потока main(), один для потока thread1() и один для потока thread2(). Затем, как и прежде, стартуют потоки вычисления графики (в нашем случае это потоки thread1() и thread2()). Для примера вместо приведения реальных алгоритмов графических вычислений мы просто временно «усыпили» потоки, указав в них
sleep(20)
и sleep(40)
, чтобы имитировать вычисления. Для осуществления синхронизации основной поток (main()) просто блокирует сам себя на барьере, зная, что барьер будет разблокирован только после того, как рабочие потоки аналогично присоединятся к нему.
Как упоминалось ранее, с функцией pthread_join() рабочие потоки для синхронизации главного потока с ними должны умереть. В случае же с барьером потоки живут и чувствуют себя вполне хорошо. Фактически, отработав, они просто разблокируются по функции barrier_wait(). Тонкость здесь в том, что вы обязаны предусмотреть, что эти потоки должны делать дальше! В нашем примере с графикой мы не дали им никакого задания для них — просто потому что мы так придумали алгоритм. В реальной жизни вы могли бы захотеть, например, продолжить вычисления.
Предположим, что мы слегка изменили наш пример так, чтобы можно было проиллюстрировать, почему иногда хорошо иметь несколько потоков даже в системе с одиночным процессором.
В таком модифицированном примере один узел на сети ответственен за вычисление строк растра (как и в примере с графикой, рассмотренном выше). Однако, когда строка рассчитана, ее данные должны быть отправлены по сети другому узлу, который выполняет функцию отображения. Ниже приведена соответствующая модифицированная функция main() (на основе первоначального примера без потоков):
int main(int argc, char **argv) {
int x1;
... // выполнить инициализации
for (x1 = 0; x1 < num_x_lines; x1++) {
do _one_line(x1); // Область «С» на схеме
tx_one_line_wait_ack(x1); // Области «X» и «W» на схеме
}
}
Обратите внимание на то, что мы исключили отображающую часть программы и вместо этого добавили функцию tx_one_line_wait_ack(). Далее предположим, что мы имеем дело с достаточно медленной сетью, но процессор в действительности не занимается передачей данных — он просто отдает их некоторым аппаратным средствам, которые уже сами позаботятся об их передаче. Функция tx_one_line_wait_ack() потребует немного процессорного времени на то, чтобы обеспечить передачу данных аппаратным средствам, и после этого, пока не получит подтверждения о получении данных от удаленного узла, не будет потреблять процессорное время вообще.
Ниже представлена диаграмма, иллюстрирующая загрузку процессора в данном случае (графические вычисления на ней обозначены как «С», передача — как «X», а ожидание подтверждения — как «W»).
Последовательное выполнение, один процессор.
Минуточку! Мы тратим впустую драгоценные секунды, ожидая, пока аппаратура сделает свое дело!
Если мы сделали бы это в многопоточном варианте, мы смогли бы добиться более эффективного использования процессора, так?
Многопоточное выполнение, один процессор
Это уже намного лучше, потому что теперь, даже при том, второй поток затрачивает немного времени на ожидание, мы добились уменьшения суммарного времени вычислений.
Если бы в нашем примере тратилось Tcompute единиц времени на вычисления, Ttx — на передачу и Twait — на ожидание аппарату средств, тогда для первого случая в нашем примере общие затраты времени на обработку были бы равны:
(Tcompute + Ttx + Twait) ∙ num_x_lines,
тогда как затраты времени при использовании двух потоков были бы равны:
(Tcompute + Ttx) ∙ num_x_lines + Twait,
что меньше на величину:
Twait ∙ (num_x_lines – 1),
в предположении, конечно, что Twait ≤ Tcompute.
Отметим, что мы изначально будем ограничены интервалом времени, равным:
Tcompute + Ttx ∙ num_x_lines,
потому что мы должны будем завершить по меньшей мере одно полное вычисление, а также еще и передать данные. Иными словами, мы можем использовать многопоточность для распараллеливания вычислений, но аппаратный ресурс для передачи данных у нас все равно есть только один.
А если бы мы разработали вариант системы с четырьмя потоками и выполнили это в SMP-системе с четырьмя процессорами, это выглядело бы примерно так:
Четыре потока, четыре процессора.
Обратите внимание, насколько каждый из этих четырех центральных процессоров недоиспользован (см. незаштрихованные прямоугольники в строках «Загрузка»). На представленном выше рисунке имеются две интересные зоны. Когда все четыре потока стартуют одновременно, все они вычисляются. К сожалению, когда потоки заканчивают вычисления, они начинают конкурировать за право обладания аппаратными средствами передачи данных (зоны «X» на диаграмме смещены одна относительно другой, поскольку, имея только один передающий ресурс, можно вести только одну передачу одновременно). Это дает нам небольшую аномалию на начальном этапе. После того как потоки отработали этот этап, они оказываются естественным образом синхронизированы по отношению к работе аппаратных средств, так как время передачи данных намного меньше, чем ¼ времени вычислительного цикла. Если игнорировать эту небольшую аномалию в работе системы на начальном этапе, значения временных интервалов в данной системе можно оценить по формуле:
(Tcompute + Ttx + Twait) ∙ num_x_lines / num_cpus
Из этой формулы следует, что применение четырех потоков на четырех процессорах обеспечивает сокращение затрат времени приблизительно в 4 раза по сравнению с аналогичным временем в модели с единственным потоком, т.е. по сравнению с данным! примера, с которого мы начали обсуждение этой проблемы.
Суммируя все то, что мы узнали из анализа примера с использованием многопоточного варианта с одиночным процессором, в идеале мы желали бы иметь больше потоков, чем процессоров, чтобы дополнительные потоки могли «подобрать» время простоя процессоров, которое естественным образом возникает из интервалов ожидания подтверждения (а также из интервалов ожидания, связанных с конкуренцией за передатчик) В этом случае у нас бы получилось примерно вот что: (см. рис. «Восемь потоков, четыре процессора»).
Восемь потоков, четыре процессора.
На этом рисунке предполагается следующее:
• потоки 5, 6, 7 и 8 привязаны к процессорам 1, 2, 3, и 4 (для упрощения);
• передача данных выполняется с более высоким приоритетов чем вычислительные операции;
• прервать передачу нельзя.
Из диаграммы видно, что хоть мы теперь и имеем в два раза больше потоков, чем процессоров, мы по-прежнему сталкиваемся с временными интервалами, в течение которых процессоры «недоиспользованы». На рисунке показаны три таких интервала времени. Эти интервалы обозначены числами, соответствующими номеру процессора, и указаны на временных диаграммах загрузки процессоров в строках «Загрузка»:
1. Поток 1 ожидает подтверждения (состояние «W»), при этом поток 5 завершил вычисления и ждет доступности передатчика.
2. Потоки 2 и 6 ожидают подтверждения.
3. Поток 3 ожидает подтверждения, при этом поток 7 завершил вычисления и ждет доступности передатчика.
Этот пример для нас — важный урок. Бессмысленно просто увеличивать количество процессоров в надежде, что все ваши дела пойдут быстрее, поскольку имеются также и ограничивающие факторы. В некоторых случаях эти ограничивающие факторы определяются просто конструкцией материнской платы мультипроцессорной системы, то есть структурой подсистемы разрешения конфликтов за устройства в память, когда несколько процессоров пытаются обратиться по одному и тому же адресу. В нашем случае обратите внимание, что строка «Использование порта передачи данных» стала все больше заполняться. Если бы мы просто увеличили число процессоров, то в конечном счете столкнулись бы с проблемами, связанными с тем, что соответствующие потоки простаивали бы в ожидании передатчика.
В любом случае, используя потоки-«мусорщики» для сбора неиспользованных ресурсов процессоров, мы сможем обеспечить намного более эффективное использование процессоров. Это время приближенно оценивается по формуле:
(Tcompute + Ttx + Twait) ∙ num_x_lines / num_cpus
При выполнении только вычислений мы ограничены только количеством процессоров; ни один процессор не будет простаивать в ожидании подтверждения. Впрочем, это был бы идеальный случай. Как вы видели из диаграммы, реально периодически возникают временные интервалы, когда один процессор простаивает. Также, как отмечалось ранее, мы в любом случае ограничены по скорости значением:
Tcompute + Ttx ∙ num_x_lines.
При том, что в общем случае вы можете запросто «игнорировать», работаете вы с SMP-архитектурой или с одиночным процессором, есть ряд обстоятельств, которые определенно добавят вам головной боли. К сожалению, это могут быть такие маловероятные события, которые могут проявиться не на этапе разработки, а на этапе его испытаний, в демонстрационных версиях или даже, что самое неприятное, на стадии эксплуатации. Так вот, следование ряду принципов «защитного программирования» избавит вас от связанной с этими проблемами нервотрепки.
Вот краткий перечень того, что следует четко помнить, имея дело с SMP-системой:
• Потоки действительно могут работать и работают параллельно — ни в коем случае не доверяйте при их синхронизации таким механизмам как диспетчеризация FIFO или система приоритетов.
• Потоки могут также выполняться одновременно с обработчиками прерываний (ISR) — это означает, что вам нужно будет не только защитить поток от обработчика прерываний, но и наоборот — обработчик прерываний от потока. Подробнее об этом см. в главе 4, «Прерывания».
• Некоторые операции, которые по вашему мнению должны быть атомарными, в действительности таковыми не являются — это зависит от операции и от процессора. Отметим из такого списка операции типа «чтение- модификация-запись» (например,
++
, --
, &=
, т.д.). См. файл
для анализа возможных замен. (Заметьте, что это не проблема SMP в чистом виде; код для вышеупомянутых операции может выполняться не как атомарный на большинстве RISC-процессоров).
Ранее в разделе «Где хороша многопоточность» говорилось о том, что потокам также находят применение там, где имеет место обработка информации по множеству независимых алгоритмов с разделяемыми структурами данных. При этом, строго говоря, вы могли бы использовать несколько процессов (с одним потоком каждый), явно разделяющих данные, но в некоторых случаях вместо этого гораздо удобнее использовать один многопоточный процесс. Давайте рассмотрим, почему и где здесь можно использовать потоки.
В наших примерах будем отталкиваться от стандартной модели «ввод-обработка-вывод». В наиболее общем случае одна часть этой модели ответственна за получение откуда-либо входных данных, другая часть — за обработку этих данных и преобразование их в некоторые выходные данные (или управляющие воздействия), третья часть — за отправку полученных выходных данных куда надо.
Давайте, во-первых, осмыслим, что мы будем иметь в случае нескольких однопоточных процессов. Для нашей модели у нас было бы три процесса — процесс «ввода», процесс «обработки» и процесс «вывода»:
Система 1: Несколько операций, несколько процессов.
В таком виде наша модель в высшей степени абстрактна, но и в такой же степени «слабо связана». Процесс «ввода» не имеет никакой реальной связи ни с процессом «обработки», ни с процессом «вывода» — он просто отвечает за сбор входных данных и передачу их как-нибудь на следующий этап («этап обработки»).
Мы могли бы сказать то же самое о процессах «обработки» и «вывода» — они также не имеют никакой реальной связи друг с другом. Также здесь предполагается, что обмен данными («ввод — обработка» и «обработка — вывод») осуществляется по некоторому стандартному протоколу (например, через программные каналы, очереди сообщений POSIX, обмен сообщениями QNX/Neutrino — что угодно).
В зависимости от объема потока данных, мы можем пожелать оптимизировать характер связей. Самый простой путь состоит в том, чтобы связать три процесса «теснее». Попробуем теперь вместо использования универсального протокола соединения выбрать схему с разделяемой памятью (на диаграмме толстые стрелки указывают потоки данных; тонкие стрелки — потоки управления):
Система 2: Несколько операций, буферы разделяемой памяти между процессами.
В данной схеме мы «подтянули» связь так, чтобы в результате обеспечить более быстрый и более эффективный обмен данными. В то же время, мы здесь по-прежнему можем применять универсальный протокол для передачи «управляющей» информации, поскольку предполагается, что по сравнению с потоком данных ее не так много.
Система с наиболее тесными связями представлена на следующей схеме:
Система 3: Несколько операций, несколько потоков.
Здесь мы наблюдаем один процесс с тремя потоками. Все три потока неявно разделяют области данных. Обмен управляющей информацией может быть реализован аналогично предыдущим примерам или с помощью ряда примитивов синхронизации потоков (мы уже имели дело с мутексами, барьерами и семафорами — скоро рассмотрим и другие).
Давайте теперь сравним эти три метода по ряду критериев и взвесим все «за» и «против».
В системе 1 связь была самой слабой. Это имеет то преимущество, что каждый из трех процессов может быть легко (то есть при помощи командной строки, в противоположность перекомпиляции/переработке) заменен другим модулем. Это следует из самой природы модели, потому что «единицей модульности» здесь является сам функциональный модуль. Система 1 является также единственной, которая из всех трех может быть распределена по узлам сети QNX/Neutrino. Поскольку информационные связи здесь абстрагированы до некоторого универсального протокола, очевидно, что эти три процесса могут быть выполнены на любой машине в сети. Это может быть очень мощным фактором масштабируемости в Вашем проекте — вам может понадобиться расширить свою сеть до сотен узлов, либо разделенных географически, либо как-то иначе — например, для совместимости с другими аппаратными средствами.
Однако, как только мы переходим к применению разделяемой памяти, мы теряем способность распределять модули по сети. QNX/Neutrino не поддерживает распределенные объекты разделяемой памяти. Таким образом, в Системе 2 мы реально ограничили себя выполнением всех трех процессов на одной и той же машине. Мы не потеряли способность легкой замены или исключения модулей, потому что модули все еще представляют собой отдельные процессы, управляемые командной строкой. Но мы добавили ограничение, в соответствии с которым все заменяемые компоненты должны соответствовать модели с разделяемой памятью.
В системе 3 мы теряем все отмеченные ранее проектные возможности. Мы определенно не можем выполнять различные потоки одного процесса на различных узлах (хотя при этом мы можем выполнять их на различных процессорах в SMP-системе). Также мы потеряли наши возможности переконфигурации — теперь нам обязательно понадобится механизм явного доопределения, который из алгоритмов «ввода», «обработки» и «вывода» мы должны использовать (эту проблему можно решить с помощью разделяемых объектов, также известных как динамические библиотеки — DLL).
Так почему же я должен проектировать свою систему, используя многопоточность, как в Системе 3? Почему бы мне для обеспечения максимальной универсальности не выбрать Систему 1?
Ну, даже при том, что Система 3 является наиболее ригидной, она, скорее всего, окажется самой быстродействующей. В ней не будет переключений контекста между потоками в различных процессах, мне не придется настраивать разделяемую память, а также применять абстрактные методы синхронизации типа программных каналов, очередей сообщений POSIX или обмен сообщениями QNX/Neutrino для обеспечения доставки данных или управляющей информации — я смогу использовать базовые примитивы синхронизации потоков на уровне ядра. Другим преимуществом является то, что при запуске системы, состоящей из одного процесса (с тремя потоками), я могу быть уверен, что все, что мне понадобится далее, уже загружено с носителя (то есть потом не выяснится что-то типа «Опа! А нужного-то драйвера на диске и нету...») И, наконец, Система 3 также, скорее всего, будет наиболее компактной, потому что не придется использовать три отдельных копии информации, характерной для процессов (например, дескрипторы файлов).
Мораль: знайте, какое решение сулит какие выгоды и какие потери, и применяйте то, что будет оптимальным для вашего конкретного проекта.
Мы уже обсудили:
• мутексы;
• семафоры;
• барьеры.
Давайте теперь завершим нашу дискуссию о синхронизации, обсудив следующее:
• блокировки чтения/записи (reader/writer locks);
• ждущие блокировки (sleepons);
• условные переменные (condition variables);
• дополнительные сервисы QNX/QNX/Neutrino.
Блокировки чтения/записи применяются точно в соответствии с их названием: несколько «читателей» могут использовать ресурс в отсутствие «писателей», или один «писатель» может использовать ресурс в отсутствие «читателей» и других «писателей».
Эта ситуация возникает достаточно часто для того, чтобы создать отдельный примитив синхронизации специально для этих целей.
У вас будет часто возникать ситуация разделения структуры данных группой потоков. Очевидно, что в любой момент времени только один поток может записывать данные в эту структуру. Если бы запись велась более чем одним потоком одновременно, одни потоки могли бы записать свои данные поверх данных других потоков. Для предотвращения таких ситуаций поток-«писатель» должен эксклюзивно получить блокировку чтения/записи («rwlock»), обозначив этим, что он и только он имеет доступ к структуре данных. Заметьте, что это исключительное право доступа «строго контролируется на добровольных началах» — обеспечение того, чтобы все потоки, которые пользуются указанной областью данных, синхронизировались с использованием блокировок чтения/ записи, зависит только от вас.
С «читателями» ситуация противоположная. Поскольку считывание области данных — неразрушающая операция, любое число потоков может считывать данные (даже если ту же часть данных в этот момент считывает другой поток). Сложным моментом здесь является то, что никто не должен производить запись в область данных, из которой в этот момент ведется чтение. В противном случае, считывающие потоки могут быть «введены в заблуждение» — например, поток мог бы считать часть данных, затем быть вытесненным потоком-«писателем» затем возобновиться и продолжить считывание данных, но уже обновленных! Это может закончиться нарушением целостности данных.
Давайте рассмотрим вызовы, которые вы могли бы использовать при применении блокировок чтения/записи.
Первые два вызова используются для инициализации внутренних областей памяти для rwlock-блокировок (чтения/записи):
int pthread_rwlock_init(pthread_rwlock_t *lock,
const pthread_rwlockattr_t *attr);
int pthread_rwlock_destroy(pthread_rwlock_t *lock);
Функция pthread_rwlock_init() принимает аргумент lock (типа
pthread_rwlock_t
) и инициализирует его атрибутами, указанными в параметре attr. В нашем примере мы применим атрибут NULL, что будет означать «применить значения по умолчанию». Более подробно об этом см. документацию на функции:
pthread_rwlockattr_init();
pthread_rwlockattr_destroy();
pthread_rwlockattr_getpshared();
pthread_rwlockattr_setpshared().
Когда мы закончим свои дела с блокировкой чтения/записи, её следует уничтожить функцией pthread_rwlock_destroy().
Никогда не используйте блокировку, которая либо уже уничтожена, либо еще не инициализирована.
Далее, мы должны выбрать блокировку подходящего типа. Как отмечалось выше, в основном применяются два режима блокировки: «читателю» желательно иметь «неэксклюзивный» доступ, а для «писателю» — «эксклюзивный». Для упрощения имен, функции названы по именам своих пользователей:
int pthread_rwlock_rdlock(pthread_rwlock_t *lock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *lock);
int pthread_rwlock_wrlock(pthread_rwlock_t* lock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *lock);
Существует четыре функции блокировки, а не две, как вы могли бы предположить. Очевидно, «предполагаемыми» функциями были pthread_rwlock_rdlock() и pthread_rwlock_wrlock(), используемые «читателями» и «писателями», соответственно.
Это — собственно блокирующие вызовы: если блокировка для выбранной операции недоступна, поток будет блокирован. Когда блокировка становится доступной в соответствующем режиме, поток будет разблокирован, из чего он сможет предположить, что теперь можно спокойно обращаться к защищенному блокировкой ресурсу.
Иногда, тем не менее, поток может не захотеть блокироваться, желая вместо этого просто узнать, доступна ли нужная блокировка. Для этого и существуют версии функций, содержащие в имени «try» («проверка»). Важно отметить, что «проверочные» версии получат блокировку, если она доступна, но если нет, тогда они не будут блокированы, а только возвратят код ошибки. Причина, по которой они должны получать блокировку, если она доступна, очень проста. Предположим, что поток хочет получить блокировку на чтение, но не хочет ждать, если блокировка окажется недоступной. Поток вызывает функцию pthread_rwlock_tryrdlock(), и оказывается, что блокировка доступна. Если бы функция pthread_rwlock_tryrdlock() не захватывала доступную блокировку немедленно, могли бы произойти неприятные вещи — наш поток мог бы быть, к примеру, вытеснен другим потоком, а тот, в свою очередь, мог бы блокировать нужный нам ресурс. Поскольку первому потоку фактически не была предоставлена блокировка, после возобновления ему придется вызывать pthread_rwlock_rdlock(), и вот теперь он будет заблокирован, поскольку ресурс более недоступен. Иными словами, в такой ситуации даже поток, не желающий блокироваться и поэтому вызывающий «проверочную» версию, по-прежнему может быть заблокирован!
Наконец, независимо от того, как блокировка нами применялась, нам необходим способ ее освобождения:
int pthread_rwlock_unlock(pthread_rwlock_t* lock);
После того как поток выполнил нужную операцию с ресурсом, он освобождает блокировку, вызывая функцию pthread_rwlock_unlock(). Если блокировка теперь становится доступной в режиме, который запрошен и ожидается другим потоком, то этот ждущий поток будет переведен в состояние готовности (READY).
Отметим, что мы не смогли бы реализовать такую форму синхронизации только с помощью мутекса. Мутекс рассчитан только на один поток, что было бы хорошо в случае записи (чтобы только один поток мог использовать ресурс в определенный момент времени), но оплошал бы в случае считывания, потому что не допустил бы к ресурсу более чем одного «читателя». Семафор также был бы бесполезен, потому что нельзя было бы отличить два режима доступа — применение семафора могло бы обеспечить доступ нескольких «читателей», но если бы семафором попытался завладеть «писатель», его вызов ничем бы не отличался от вызова «читателей», что вызвало бы некрасивую ситуацию с множеством «читателей» и множеством же «писателей»!
Другая типовая ситуация в многопоточных программах — это потребность заставить поток «ждать чего-либо». Этим «чем- либо» может являться фактически что угодно! Например, когда доступны данные от устройства, или когда конвейерная лента находится в нужной позиции, или когда данные сохранены на диск, и т.д. Еще одна хитрость этой ситуации состоит в том, что одного и того же события могут ожидать несколько потоков.
Для таких целей мы могли бы использовать либо условную переменную (condition variable), о которой речь ниже, либо, что гораздо проще, ждущую блокировку (sleepon).
Для применения ждущих блокировок надо выполнить несколько операций. Рассмотрим сначала вызовы, а затем вернемся к использованию ждущих блокировок.
int pthread_sleepon_lock(void);
int pthread_sleepon_unlock(void);
int pthread_sleepon_broadcast(void *addr);
int pthread_sleepon_signal(void *addr);
int pthread_sleepon_wait(void *addr);
He дайте префиксу pthread_ себя обмануть. Эти функции не предусмотрены стандартами POSIX.
Как было отмечено ранее, потоку может быть необходимо ждать какого-нибудь события. Наиболее очевидный выбор из представленного выше списка функций — это функция pthread_sleepon_wait(). Но сначала поток должен проверить, надо ли ждать. Давайте приведем пример. Один поток представляет собой поток-«поставщик», который получает данные от неких аппаратных средств. Другой поток — поток-«потребитель» и он неким образом обрабатывает поступающие данные. Рассмотрим сначала поток-«потребитель»:
volatile int data_ready = 0;
consumer() {
while (1) {
while (!data_ready) {
// wait
}
// Обработать данные
}
}
«Потребитель» вечно находится в своем главном обрабатывающем цикле (
while(1)
). Первое, что он проверяет — это флаг data_ready. Если этот флаг равен 0, это означает, что данных нет, и их надо ждать. Впоследствии поток-«производитель» должен будет как-то «разбудить» его, и тогда поток-«потребитель» должен будет повторно проверить состояние флага data_ready. Положим, что происходит именно это. Поток-«потребитель» анализирует состояние флага и определяет, что флаг равен 1, то есть данные теперь доступны. Поток-«потребитель» переходит к обработке поступивших данных, после чего он должен снова проверить, не поступили ли новые данные, и так далее.
Здесь мы можем столкнуться с новой проблемой. Как «потребителю» сбрасывать флаг data_ready согласованно с «производителем»? Очевидно, нам понадобится некоторая форма монопольного доступа к флагу, чтобы в любой момент времени только один из этих потоков мог модифицировать его. Метод, который применен в данном случае, заключается в применения мутекса, но это внутренний мутекс библиотеки ждущих блокировок, так что мы сможем обращаться к нему только с помощью двух функций: pthread_sleepon_lock() и pthread_sleepon_unlock(). Давайте модифицируем наш поток-«потребитель»:
consumer() {
while (1) {
pthread_sleepon_lock();
while (!data_ready) {
// WAIT
}
// Обработать данные
data_ready = 0;
pthread_sleepon_unlock();
}
}
Здесь мы добавили «потребителю» установку и снятие блокировки. Это означает, что потребитель может теперь надежно проверять флаг data_ready, не опасаясь гонок, а также надежно его устанавливать.
Великолепно! А как насчет собственно процесса ожидания? Как мы и предполагали ранее, там действительно применяется вызов функции pthread_sleepon_wait(). Вот второй while-цикл:
while (!data_ready) {
pthread_sleepon_wait(&data_ready);
}
Функция pthread_sleepon_wait() в действительности выполняет три действия:
1. Разблокирует мутекс библиотеки ждущих блокировок.
2. Выполняет собственно операцию ожидания.
3. Снова блокирует мутекс библиотеки ждущих блокировок.
Причина обязательной разблокировки/блокировки мутекса библиотеки проста: поскольку суть мутекса состоит в обеспечении взаимного исключения доступа к флагу data_ready, мы хотим запретить потоку-«производителю» изменять флаг data_ready, пока мы его проверяем. Но если мы не разблокируем флаг впоследствии, то поток-«производитель» не сможет его установить, чтобы сообщить нам о доступности данных! Операция повторной блокировки выполняется автоматически исключительно для удобства, чтобы вызвавший функцию pthread_sleepon_wait() поток не беспокоился о состоянии блокировки после «пробуждения».
Давайте перейдем теперь к потоку-«производителю» и рассмотрим, как он использует библиотеку ждущих блокировок. Вот его полная реализация:
producer() {
while (1) {
// Ждать прерывания от оборудования...
pthread_sleepon_lock();
data_ready = 1;
pthread_sleepon_signal(&data_ready);
pthread_sleepon_unlock();
}
}
Как вы видите, поток-«производитель» также блокирует мутекс, чтобы получить монопольный доступ к флагу data_ready перед его установкой.
Клиента «пробуждает» не установка флага data_ready в единицу (1), а вызов функции pthread_sleepon_signal()!
Давайте рассмотрим происходящее в подробностях. Определим состояния «потребителя» и «производителя» следующим образом:
Состояние | Означает |
---|---|
CONDVAR | ожидание соответствующей ждущей блокировке условной переменной |
MUTEX | ожидание мутекса |
READY | состояние готовности, т.е., готов выполняться или уже выполняется |
INTERRUPT | ожидание прерывания от аппаратных средств |
Действие | Владелец мутекса | Состояние «потребителя» | Состояние «производителя» |
---|---|---|---|
«потребитель» блокирует мутекс | «потребитель» | READY | INTERRUPT |
«потребитель» проверяет флаг data_ready | «потребитель» | READY | INTERRUPT |
потребитель вызывает функцию pthread_sleepon_wait() | «потребитель» | READY | INTERRUPT |
функция pthread_sleepon_wait() разблокирует мутекс | мутекс свободен | READY | INTERRUPT |
функция pthread_sleepon_wait() блокируется | мутекс свободен | CONDVAR | INTERRUPT |
пауза до прерывания | мутекс свободен | CONDVAR | INTERRUPT |
аппаратные средства генерируют данные | мутекс свободен | CONDVAR | READY |
«производитель» блокирует мутекс | «производитель» | CONDVAR | READY |
«производитель» устанавливает флаг data_ready | «производитель» | CONDVAR | READY |
«производитель» вызывает pthread_sleepon_signal() | «производитель» | CONDVAR | READY |
«потребитель» «пробуждается», функция pthread_sleepon_wait() пытается заблокировать мутекс | «производитель» | MUTEX | READY |
«производитель» разблокирует мутекс | мутекс свободен | MUTEX | READY |
«потребитель» получает мутекс | «потребитель» | READY | READY |
«потребитель» обрабатывает данные | «потребитель» | READY | READY |
«производитель» ждет новых данных от аппаратуры | «потребитель» | READY | INTERRUPT |
пауза («потребитель» обрабатывает полученные данные) | «потребитель» | READY | INTERRUPT |
«потребитель» завершает обработку и разблокирует мутекс | мутекс свободен | READY | INTERRUPT |
«потребитель» возвращается в начало цикла и блокирует мутекс | «потребитель» | READY | INTERRUPT |
Последняя строка в таблице повторяет первую — мы совершили один полный цикл.
Каково назначение флага data_ready? Он служит для двух целей:
• Он является флагом состояния — посредником между «потребителем» и «производителем», указывающим на состояние системы. Если флаг установлен в состояние 1, это означает, что данные доступны для обработки; если этот флаг установлено в состояние 0, это означает, что данных нет, и поток-потребитель должен быть заблокирован.
• Он выполняет функцию «места, где происходит синхронизация со ждущей блокировкой». Более формально говоря, адрес переменной data_ready используется как уникальный идентификатор объекта, по которому осуществляется ждущая блокировка. Мы запросто могли бы применить «
(void*)12345
» вместо «&data_ready
» — библиотеке ждущих блокировок все равно, что это за идентификатор, лишь бы он был уникален и корректно использовался. Использование же в качестве идентификатора адреса переменной есть надежный способ сгенерировать уникальный номер, поскольку не бывает же двух переменных с одинаковым адресом!
• К обсуждению различий между функциями pthread_sleepon_signal() и pthread_sleepon_broadcast() мы еще вернемся в разговоре об условных переменных.
Условные переменные (или «condvars») очень похожи на ждущие блокировки, которые мы рассматривали выше. В действительности, ждущие блокировки — это надстройка над механизмом условных переменных, и именно поэтому в таблице, иллюстрировавшей использование ждущих блокировок, у нас встречалось состояние CONDVAR. Функция pthread_cond_wait() точно так же освобождает мутекс, ждет, а затем повторно блокирует мутекс, аналогично функции pthread_sleepon_wait().
Давайте опустим вступление и обратимся к нашему примеру о «производителе» и «потребителе» из раздела о ждущих блокировках, но вместо ждущих блокировок будем использовать условные переменные. А затем уже обсудим вызовы.
/*
* cp1.c
*/
#include
#include
int data_ready = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condvar = PTHREAD_COND_INITIALIZER;
void* consumer(void *notused){
printf("Это поток-потребитель...\n");
while (1) {
pthread_mutex_lock(&mutex);
while (!data_ready) {
pthread_cond_wait(&condvar, &mutex);
}
// Обработать данные
printf("Потребитель: получил данные от производителя\n");
data_ready = 0;
pthread_cond_signal(&condvar);
pthread_mutex_unlock(&mutex);
}
}
void* producer (void *notused) {
printf("Это поток-производитель...\n");
while (1) {
// Получить данные от оборудования
// (мы имитируем это при помощи sleep(1))
sleep(1);
printf("Производитель: получил данные от h/w\n");
pthread_mutex_lock(&mutex);
while (data_ready) {
pthread_cond_wait(&condvar, &mutex);
}
data_ready = 1;
pthread_cond_signal(&condvar);
pthread_mutex_unlock(&mutex);
}
}
main() {
printf(
"Начало примера с производителем и потребителем...\n");
// Создать поток-производитель и поток-потребитель
pthread_create(NULL, NULL, producer, NULL);
pthread_create(NULL, NULL, consumer, NULL);
// Дать потокам немного повыполняться
sleep(20);
}
Этот пример в значительной степени похож на программу с применением ждущей блокировки, с небольшими отличиями (мы добавили несколько вызовов printf(), а также функцию main(), чтобы программа могла работать!) Первое отличие, которое бросается в глаза, — здесь использован новый тип данных,
pthread_cond_t
. Это просто декларация для условной переменной; мы назвали нашу условную переменную condvar.
Следующее, что видно из примера, — это то, что структура «потребителя» идентична таковой в предыдущем примере с ждущей блокировкой. Мы заменили функции pthread_sleepon_lock() и pthread_sleepon_unlock() на стандартные мутекс-ориентированные версии (pthread_mutex_lock() и pthread_mutex_unlock()). Функция pthread_sleepon_wait() была заменена на функцию pthread_cond_wait().
Основное различие здесь состоит в том, что библиотека ждущих блокировок имеет скрытый внутренний мутекс, а при использовании условных переменных мутекс передается явно. Последний способ дает нам больше гибкости.
И, наконец, обратите внимание на то, что мы использовали функцию pthread_cond_signal() вместо функции pthread_sleepon_signal() (опять же, с явной передачей мутекса).
В разделе о ждущих блокировках мы обещали обсудить различие между функциями pthread_sleepon_broadcast() и pthread_sleepon_signal(). Заодно поговорим и о различии между двумя аналогичными функциями, имеющими отношение к условным переменным: pthread_cond_signal() и pthread_cond_broadcast().
В двух словах, функция в варианте «signal» разблокирует только один поток. Например, если бы несколько потоков находилось в ожидании по функции «wait», и некий поток вызвал бы функцию pthread*_signal(), то был бы разблокирован только один из ждущих потоков. Который из них? Тот, у которого наивысший приоритет. Если имеется два или более потоков с одинаковым приоритетом, порядок «пробуждения» будет не определен. Применение же варианта pthread*_broadcast() приведет к тому что будут разблокированы все ожидающие потоки.
Разблокировать все потоки может показаться излишним. Но с другой стороны, разблокировать только один (причем случайный поток тоже не совсем корректно.
Поэтому мы должны думать, где имеет смысл использовать какой вариант. Очевидно, что если у вас только один ждущий поток, как у нас и было во всех вариантах «потребителя», функция pthread*_signal() прекрасно справится — будет разблокирован один поток, и как раз тот, который нужно (потому что других просто нет).
В ситуации с несколькими потоками в первую очередь следует выяснить: а почему они ждут? Обычно на этот вопрос есть два ответа:
• все потоки рассматриваются как эквивалентные и реально образуют пул доступных потоков, готовых к обработке некоторого запроса;
• все потоки являются уникальными, и каждый из них ждет соблюдения своего специфического условия.
В первом случае мы можем представить себе, что код всех потоков имеет примерно следующий вид:
/*
* cv1.c
*/
#include
#include
pthread_mutex_t mutex_data = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv_data = PTHREAD_COND_INITIALIZER;
int data;
thread1() {
for (;;) {
pthread_mutex_lock(&mutex_data);
while (data == 0) {
pthread_cond_wait(&cv_data, &mutex_data);
}
// Сделать что-нибудь
pthread_mutex_unlock(&mutex_data);
}
}
В этом случае абсолютно неважно, который именно из потоков получит данные — главное, чтобы хотя бы один сделал это и произвел над этими данными необходимые действия.
Однако, если ваш код подобен приведенному ниже, все будет несколько по-иному:
/*
* cv2.c
*/
#include
#include
pthread_mutex_t mutex_xy = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv_xy = PTHREAD_COND_INITIALIZER;
int x, y;
int isprime(int);
thread1() {
for (;;) {
pthread_mutex_lock(&mutex_xy);
while ((x > 7) && (y != 15)) {
pthread_cond_wait(&cv_xy, &mutex_xy);
}
// Сделать что-нибудь
pthread_mutex_unlock(&mutex_xy);
}
}
thread2() {
for (;;) {
pthread_mutex_lock(&mutex_xy);
while (!isprime(x)) {
pthread_cond_wait(&cv_xy, &mutex_xy);
}
// Сделать что-нибудь
pthread_mutex_unlock(&smutex_xy);
}
}
thread3() {
for (;;) {
pthread_mutex_lock(&mutex_xy);
while (x != y) {
pthread_cond_wait(&cv_xy, &mutex_xy);
}
// Сделать что-нибудь
pthread_mutex_unlock(&mutex_xy);
}
}
В этом случае пробуждение одного потока ничего не даст! Здесь мы обязаны «разбудить» все три потока, чтобы каждый из них проверил соблюдение своего условия.
Это в полной мере отражает второй вариант ответа на наш вопрос «а почему они ждут?» Так как все потоки все ждут соблюдения различных условий (поток thread1() ждет, пока значение x не станет меньше или равно 7, или пока значение у не станет равным 15, поток thread2() ждет, пока значение x не станет простым числом, а поток thread3() ждет, пока x не станет равным у), у нас нет никакого выбора, кроме как «разбудить» все потоки «одновременно».
Ждущие блокировки имеют одно основное преимущество в сравнении с условными переменными. Предположим, что вам надо синхронизировать множество объектов. Используя условные переменные, вы бы ассоциировали с каждым объектом отдельную условную переменную — если бы у вас было M объектов, вы, скорее всего, определили бы M условных переменных. При применении же ждущих блокировок соответствующие им условные переменные создаются динамически по мере постановки потоков на ожидание, поэтому в этом случае на M объектов и N блокированных потоков у вас было бы максимум N, а не M условных переменных.
Однако, условные переменные более универсальны, чем ждущие блокировки, и вот почему:
1. Ждущие блокировки в любом случае основаны на условных переменных.
2. Мутексы ждущих блокировок скрыты в библиотеке; условные переменные позволяют вам задавать его явно.
Первый пункт сам по себе достаточно убедителен. :-) Второй, однако, имеет еще и практический смысл. Когда мутекс скрыт в библиотеке, это означает, что он может быть только один на процесс, независимо от числа потоков в этом процессе или от количества переменных. Это может быть сильно ограничивающим фактором, особенно если принять во внимание, что вам придется использовать один-единственный мутекс для синхронизации доступа всех имеющихся потоков в процессе ко всем нужным им переменным!
Намного лучшая схема состоит в применении нескольких мутексов — по одному на каждый набор данных — и явно сопоставлять им условные переменные по мере необходимости. Как мощь, так и опасность этого подхода заключаются в том, что ни на этапе компиляции, ни на этапе выполнения не будет производиться никаких проверок, и вам придется самим следить за:
• блокировкой мутексов перед доступом к соответствующим переменным;
• применением правильного мутекса для каждой переменной;
• применением правильной условной переменной для соответствующих мутекса и переменной (данных).
Самый простой путь решения этих проблем — грамотно проектировать и тщательно проверять, а также заимствовать приемы объектно-ориентированного программирования (например, встраивать мутексы в структуры данных, создавать для обращения к структурам данных специализированные подпрограммы, и т.д.). Разумеется, то, в какой степени вы примените первый, второй, или оба варианта, будет зависеть не только от вашего стиля программирования, но и от требований производительности.
Ключевыми моментами при использовании условных переменных являются:
1. Мутексы следует использовать для проверки и изменения переменных.
2. Условные переменные следует использовать в качестве «точки встречи».
Ниже представлена иллюстрация этого:
Связь мутексов и условных переменных по схеме «один к одному»
Одно интересное замечание. Поскольку никаких проверок не выполняется, вы можете, например, связать один набор переменных с мутексом «MutexABC», другой — с мутексом «MutexDEF», и сопоставить обоим наборам переменных одну и ту же условную переменную «CondvarABCDEF»:
Связь мутексов и условных переменных по схеме «один ко многим».
Это весьма полезное свойство. Поскольку мутекс должен использоваться для «проверки и изменения» всегда, это подразумевает, что я должен буду выбрать правильный мутекс всякий раз, когда мне понадобится доступ к некоей переменной. Вполне логично — если я, скажем, проверяю переменную «С», то, очевидно, мне потребуется заблокировать мутекс «MutexABC». А что если я хочу изменить переменную «E»? Хорошо, перед этим я должен буду захватить мутекс «MutexDEF». Затем я ее изменяю и сообщаю об этом другим потокам через условную переменную «CondvarABCDEF», после чего освобождаю мутекс.
А теперь смотрите, что происходит. Толпа потоков, ждавших на условии «CondvarABCDEF», вдруг резко «просыпается» (по функции pthread_cond_wait()). Их функции ожидания немедленно пытаются повторно захватить мутекс. Критическим моментом здесь является то, что мутексов два. (В зависимости от того, изменения какой переменной поток ждал, его функция ожидания попытается захватить либо MutexABC, либо MutexDEF — прим. ред.) Это означает, что в SMP-системе возникли бы две конкурирующие очереди потоков, и в каждой потоки будут проверять как бы независимые переменные, используя при этом независимые мутексы. Круто, да?
QNX/Neutrino позволяет делать еще ряд изящных вещей. POSIX утверждает, что с мутексом должны работать потоки одного и того же процесса, но позволяет в соответствующей реализации эту концепцию расширять. В QNX/Neutrino это расширение сводится к тому, что мутекс может использоваться потоками различных процессов. Чтобы понять, почему это работает, вспомните: то, что мы рассматриваем как «операционную систему», реально состоит из двух частей — ядра, которое занимается диспетчеризацией, и администратора процессов, который, наряду со всем остальным, заботится о защите памяти и «процессах». Мутекс — всего-навсего объект синхронизации потоков. Поскольку ядро работает только с потоками, то реально ему все равно, какие потоки работают в каких процессах, это уже забота администратора.
Итак, если вы установили область разделяемой памяти между двумя процессами и разместили в ней мутекс, ничто не мешает вам с его помощью синхронизировать потоки в двух (или более!) процессах — функции pthread_mutex_lock() и pthread_mutex_unlock() будут работать точно так же.
Другое существенное дополнение в QNX/Neutrino — это понятие пула потоков. Вы будете часто обращать внимание в ваших программах на то обстоятельство, что вам хотелось бы иметь несколько потоков и управлять их поведением в определенных пределах. Например, для сервера вы можете решить, что первоначально в ожидании сообщения от клиента должен быть блокирован только один поток. Когда этот поток получит сообщение и пойдет обслуживать запрос, вы можете принять решение о том, что хорошо было бы создать другой поток и блокировать его в ожидании на случай поступления другого запроса — тогда этот запрос будет кому обработать. И так далее. Через некоторое время, когда все запросы будут обслужены, у вас может оказаться большое число потоков, бездействующих в ожидании. Чтобы не расходовать ресурсы впустую, вам, возможно, захочется уничтожить некоторые из этих «лишних» потоков.
Подобные операции в жизни — обычное дело, и для задач такого рода QNX/Neutrino предоставляет для этого специальную библиотеку.
В более ранних (до 2.00) версиях QNX/Neutrino была предусмотрена подобная функциональность, но она была скрыта в библиотеке администратора ресурсов. В версии 2.00 эти функции были вынесены из библиотеки администратора ресурсов в отдельную библиотеку. Мы еще вернемся к функциям работы с пупами потоков в главе «Администраторы ресурсов».
В рамках данного обсуждения важно понять, что следует различать два режима потоков в пулах:
• режим блокирования;
• режим обработки.
В режиме блокирования поток обычно вообще не использует ресурсы процессора. В типовом сервере это соответствует ситуации, когда поток ждет сообщения. Противоположностью этого режима является режим обработки, в котором поток может как использовать, так и не использовать ресурсы процессора — это зависит от структуры процесса. Чуть позже мы рассмотрим функции работы с пулами потоков, и вы увидите, что они дают возможность управлять количеством как блокированных, так и обрабатывающих потоков.
Для работы с пулами потоков в QNX/Neutrino предусмотрены следующие функции:
#include
thread_pool_t *thread_pool_create(
thread_pool_attr_t *attr, unsigned flags);
int thread_pool_destroy(thread_pool_t *pool);
int thread_pool_start(void *pool);
Как видно из имен функций, вы в первую очередь создаете пул потоков, используя функцию thread_pool_create(), а затем запускаете этот пул при помощи функции thread_pool_start(). Когда вы закончили свои дела с пулом потоков, вы можете использовать функцию thread_pool_destroy() для его уничтожения. Заметьте, что функция thread_pool_destroy() может вам вообще не понадобиться — например, когда ваша программа суть сервер, который работает «вечно».
Итак, первая функция, на которую следует обратить внимание — это функция thread_pool_create(). У нее два параметра: attr и flags. Параметр attr — атрибутная запись, которая определяет рабочие параметры пула потоков (см.
):
typedef struct _thread_pool_attr {
// Функции и дескриптор пула потоков
THREAD_POOL_HANDLE_T *handle;
THREAD_POOL_PARAM_T *(*block_func)
(THREAD_POOL_PARAM_T *ctp);
void (*unblock_func)(THREAD_POOL_PARAM_T *ctp);
int (*handler_func) (THREAD_POOL_PARAM_T *ctp);
THREAD_POOL_PARAM_T *(*context_alloc)
(THREAD_POOL_HANDLE_T *handle);
void *(*context_free)(THREAD_POOL_PARAM_T *ctp);
// Параметры пула потоков
pthread_attr_t *attr;
unsigned short lo_water;
unsigned short increment;
unsigned short hi_water;
unsigned short maximum;
} thread_pool_attr_t;
Я разбил определение типа
thread_pool_attr_t
на два раздела, один из которых содержит функции и дескриптор для потоков в пуле, а в другом — рабочие параметры пула.
Сначала проанализируем «параметры пула потоков», чтобы понять, как можно управлять числом потоков в пуле и их атрибутами. Имейте в виду, что здесь мы будем говорить о «режиме блокирования» и «режиме обработки» (далее, когда мы будем рассматривать функции исходящих вызовов (callout functions), мы увидим, как эти эти режимы соотносятся).
Приведенный ниже рисунок иллюстрирует связи между параметрами lo_water, hi_water и maximum.
Жизненный цикл потока в пуле потоков.
(Заметьте, что как «CA» здесь обозначается функция context_alloc(), как «CF» — функция context_free(), как «режим блокирования» — функция block_func(), а как «режим обработки» — функция handler_func().
attr | Это атрибутная запись, которая применяется при создании потока. Мы уже обсуждали эту структуру ранее (в разделе «Атрибутная запись потока»). Вспомните — это та самая структура, которая задает характеристики нового потока: приоритет, размер стека, и т.д. |
lo_water | (От «Low watermark», буквально — «нижняя ватерлиния» — прим. ред.) Этот параметр задает минимальное количество потоков, которые должны находиться в режиме блокирования. В типовом сервере это было бы количество потоков, например, ждущих запроса. Если число ждущих потоков меньше, чем значение параметра lo_water, (например, потому что мы только что приняли сообщение, и один из ждущих потоков переключился на его обработку), тогда создается дополнительно еще increment потоков. Это представлено на рисунке в виде первого этапа, обозначенного как «создание потока». |
increment | (Буквально — «приращение» — прим. ред.) Этот параметр определяет, сколько потоков должны быть созданы сразу, если число потоков, находящихся в режиме блокирования, становится меньше значения параметра lo_water. В выборе значения для этого параметра вы бы наиболее вероятно начали со значения 1 (единица). Это означало бы, что если бы число потоков в режиме блокирования стало бы меньше значения параметра lo_water, то пулом потоков был бы создан дополнительно ровно один поток. Для более тонкой настройки параметра increment можно понаблюдать за поведением процесса и определить, может ли этому параметру понадобиться принимать значения, отличные от единицы. Например, если ваш процесс периодически получает «всплески» запросов, то из того, что число потоков, находящихся в режиме блокирования, упало ниже значения lo_water, можно было бы сделать вывод как раз о таком «всплеске» и принять решение о создании более чем одного резервного потока. |
hi_water | (От «high watermark», буквально — «верхняя ватерлиния» — прим. ред.) Этот параметр указывает верхний предел числа потоков, которые могут быть в режиме блокирования одновременно. По мере завершения своих операций по обработке данных, потоки обычно будут возвращаться в режим блокирования. Однако, у библиотеки поддержки пулов потоков есть внутренний счетчик числа потоков, находящихся в режиме блокирования, и если его значение превышает значение параметра hi_water, библиотека автоматически уничтожит поток, который вызвал переполнение (то есть тот поток, который только что завершил обработку и намеревался возвратиться в режим блокирования). Это показано на рисунке раздвоением стрелки, исходящей из блока «режим обработки» — одна стрелка ведет к «режиму блокирования», а вторая — к блоку операции «CF» и далее на уничтожение потока. Таким образом, сочетание параметров lo_water и hi_water позволяет вам четко определять диапазон числа потоков, одновременно находящихся в режиме блокирования. |
maximum | Параметр указывает на максимальное число потоков, которые вообще могут работать одновременно в результате действий библиотеки поддержки пулов потоков. Например, при создании новых потоков в случае их нехватки (когда число блокированных потоков падает ниже границы lo_water) общее количество потоков было бы ограничено параметром maximum. |
Другой ключевой параметр, предназначенный для управления потоками, — это параметр flags, передаваемый функции thread_pool_create(). Он может принимать одно из следующих значений:
POOL_FLAG_EXIT_SELF
Не делать возврат из функции thread_pool_start() и не включать вызывающий поток в пул.
POOL_FLAG_USE_SELF
Не делать возврат из функции thread_pool_start(), но включить вызывающий поток в пул.
0
Функция thread_pool_start() возвратится, новые потоки будут создаваться по мере необходимости.
Приведенное описание может показаться суховатым. Давайте рассмотрим пример.
В управляющей структуре пула потоков сконцентрируем наше внимание только на значениях параметров lo_water, increment и maximum:
/*
* tp1.с
*
* Пример с пулами потоков (1)
*
*/
#include
#include
#include
#include
#include
char *progname = "tp1";
void tag (char *name) {
time_t t;
char buffer[BUFSIZ];
time(&t);
strftime(buffer, BUFSIZ, "%T ", localtime(&t));
printf("%s %3d %-20.20s: ", buffer, pthread_self(), name);
}
THREAD_POOL_PARAM_T* blockfunc(
THREAD_POOL_PARAM_T *ctp) {
tag("blockfunc");
printf("ctp %p\n", ctp);
tag("blockfunc");
printf("sleep (%d);\n", 15 * pthread_self());
sleep(pthread_self() * 15);
tag("blockfunc");
printf("Выполнили sleep\n");
tag("blockfunc");
printf("Возвращаем 0x%08X\n",
0x10000000 + pthread_self());
return((void*)(0x10000000 + pthread_self()));
// Передано handlerfunc
}
THREAD_POOL_PARAM_T* contextalloc(
THREAD_POOL_HANDLE_T *handle) {
tag("contextalloc");
printf("handle %p\n", handle);
tag("contextalloc");
printf("Возвращаем 0x%08X\n",
0x20000000 + pthread_self());
return ((void*)(0x20000000 + pthread_self()));
// Передано blockfunc
}
void contextfree(THREAD_POOL_PARAM_T *param) {
tag("contextfree");
printf("param %p\n", param);
}
void unblockfunc(THREAD_POOL_PARAM_T *ctp) {
tag("unblockfunc");
printf("ctp %p\n", ctp);
}
int handlerfunc(THREAD_POOL_PARAM_T *ctp) {
static int i = 0;
tag("handlerfunc");
printf("ctp %p\n", ctp);
if (i++ > 15) {
tag("handlerfunc");
printf("Более 15 операций, возвращаем 0\n");
return (0);
}
tag("handlerfunc");
printf("sleep (%d)\n", pthread_self() * 25);
sleep(pthread_self() * 25);
tag("handlerfunc");
printf("Выполнили sleep\n");
tag("handlerfunc");
printf("Возвращаем 0x%08X\n",
0x30000000 + pthread_self());
return (0x30000000 + pthread_self());
}
main() {
thread_pool_attr_t tp_attr;
void *tpp;
memset(&tp_attr, 0, sizeof(tp_attr));
tp_attr.handle = (void*)0x12345678;
// Передано contextalloc
tp_attr.block_func = blockfunc;
tp_attr.unblock_func = unblockfunc;
tp_attr.context_alloc = contextalloc;
tp_attr.context_free = contextfree;
tp_attr.handler_func = handlerfunc;
tp_attr.lo_water = 3;
tp_attr.hi_water = 7;
tp_attr.increment = 2;
tp_attr.maximum = 10;
if ((tpp =
thread_pool_create(&tp_attr, POOL_FLAG_USE_SELF)) ==
NULL) {
fprintf(stderr,
"%s: Ошибка thread_pool_create, errno %s\n",
progname, strerror(errno));
exit(EXIT_FAILURE);
}
thread_pool_start(tpp);
fprintf(stderr,
"%s: возврат из thread_pool_start; errno %s\n",
progname, strerror(errno));
sleep(3000);
exit(EXIT_FAILURE);
}
После установки параметров мы вызываем функцию thread_pool_create() для создания пула потоков. Эта функция возвращает указатель на управляющую структуру пула потоков (tpp), который мы проверяем на равенство NULL (что указало бы на ошибку). И, наконец, мы вызываем функцию thread_pool_start(), передав ей эту самую управляющую структуру tpp.
Я указал флаг POOL_FLAG_USE_SELF, что означает, что поток, вызвавший функцию thread_pool_start(), будет рассматриваться как доступный для ввода в пул. Таким образом, на момент старта пула в нем есть только один поток. Поскольку значение параметра lo_water равно 3, библиотека немедленно создаст еще increment потоков (в нашем случае — 2). С этого момента в пуле будет три (3) потока, и все они будут находиться в режиме блокирования. Условие по параметру lo_water удовлетворено, потому что число потоков в режиме блокирования действительно не меньше lo_water, условие по параметру hi_water удовлетворено, потому что число потоков в режиме блокирования действительно не больше hi_water; и, наконец, также удовлетворено условие по параметру maximum, потому что общее число потоков не превышает его значения. Допустим теперь, что один из потоков, находящихся в режиме блокирования, разблокируется (например, в серверном приложении — при получении сообщения). Это означает, что один из трех потоков перейдет из режима блокирования в режим обработки. Счетчик блокированных потоков уменьшится, и его значение упадет ниже значения параметра lo_water. Это переключит триггер lo_water и заставит библиотеку создать ещё increment (2) потоков. Таким образом, у нас будет всего 5 потоков (4 в режиме блокирования, и 1 — в режиме обработки).
Пусть далее разблокируется еще несколько потоков. Давайте предположим, что на этот момент еще ни один из потоков, находящихся в режиме обработки, еще не завершил свои дела. Ниже приведена таблица, в которой иллюстрируется весь процесс, начиная с исходного состояния:
Событие | Режим обработки | Режим блокирования | Всего потоков |
---|---|---|---|
Исходное состояние | 0 | 1 | 1 |
Срабатывание триггера lo_water | 0 | 3 | 3 |
Разблокирование | 1 | 2 | 3 |
Срабатывание триггера lo_water | 1 | 4 | 5 |
Разблокирование | 2 | 3 | 5 |
Разблокирование | 3 | 2 | 5 |
Срабатывание триггера lo_water | 3 | 4 | 7 |
Разблокирование | 4 | 3 | 7 |
Разблокирование | 5 | 2 | 7 |
Срабатывание триггера lo_water | 5 | 4 | 9 |
Разблокирование | 6 | 3 | 9 |
Разблокирование | 7 | 2 | 9 |
Срабатывание триггера lo_water | 7 | 3 | 10 |
Разблокирование | 8 | 2 | 10 |
Разблокирование | 9 | 1 | 10 |
Разблокирование | 10 | 0 | 10 |
Видно, что библиотека проверяет параметр lo_water, и по мере необходимости увеличивает число потоков на значение параметра increment, но только до тех пор, пока число потоков не достигнет предельного значения — параметра maximum (именно поэтому число в столбце «Всего потоков» никогда не превышает 10, даже когда условие по параметру lo_water перестает выполняться).
Это означает, что однажды наступает момент, когда потоков в режиме блокирования больше не остается. Предположим теперь, что потоки, находящиеся в режиме обработки, завершают свои дела. Посмотрим, что при этом произойдет с триггером параметра hi_water.
Событие | Режим обработки | Режим блокирования | Всего потоков |
---|---|---|---|
Завершение обработки | 9 | 1 | 10 |
Завершение обработки | 8 | 2 | 10 |
Завершение обработки | 7 | 3 | 10 |
Завершение обработки | 6 | 4 | 10 |
Завершение обработки | 5 | 5 | 10 |
Завершение обработки | 4 | 6 | 10 |
Завершение обработки | 3 | 7 | 10 |
Завершение обработки | 2 | 8 | 10 |
Срабатывание триггера hi_water | 2 | 7 | 9 |
Завершение обработки | 1 | 8 | 9 |
Срабатывание триггера hi_water | 1 | 7 | 9 |
Завершение обработки | 0 | 8 | 8 |
Срабатывание триггера hi_water | 0 | 7 | 7 |
Обратите внимание, что с потоками ничего не происходит до тех пор, пока число блокированных потоков не превышает значение hi_water. Реализация здесь такова: как только поток завершает обработку, он проверяет число блокированных на данный момент потоков, и если их слишком много (то есть больше, чем предусмотрено параметром hi_water), то «совершает самоубийство». Удобство использования параметров lo_water и hi_water в управляющих структурах состоит в том, что ими вы фактически задаете «эффективный диапазон» числа потоков, в пределах которого всегда доступно достаточное число потоков, и потоки без необходимости не создаются и не уничтожаются. В нашем случае, после выполнения действий, перечисленных в вышеупомянутых таблицах, мы имеем систему, которая способна обрабатывать до 4 запросов одновременно без необходимости в создании дополнительных потоков (7-4 = 3, что соответствует значению параметра lo_ water).
Теперь, когда мы достаточно хорошо владеем методикой управления числом потоков в пуле, давайте обратимся к другим элементам атрибутной записи пула потоков:
// Функции и дескриптор пула потоков
THREAD_POOL_HANDLE_T *handlе;
THREAD_POOL_PARAM_T *(*block_func)(
THREAD_POOL_PARAM_T *ctp);
void (*unblock_func)(THREAD_POOL_PARAM_T *ctp);
int (*handler_func)(THREAD_POOL_PARAM_T *ctp);
THREAD_POOL_PARAM_T *(*context_alloc)(
THREAD_POOL_HANDLE_T *handle);
void (*context_free)(THREAD_POOL_PARAM_T *ctp);
Повторно обратимся к рисунку «Жизненный цикл пула потоков». Из рисунка видно, что при создании потока каждый раз вызывается функция context_alloc(). (Аналогично, при уничтожении потока вызывается функция context_tree()). Элемент атрибутной записи с именем handler передается функции context_alloc() в качестве ее единственного параметра. Функция context_alloc() ответственна за индивидуальные настройки потока и возвращает указатель на контекст (списках параметров называемый ctp). Заметьте, что содержание этого указателя — исключительно ваша забота; библиотеке абсолютно все равно, что вы в него поместите.
Теперь, когда контекст создан функцией context_alloc(), вызывается функция block_func() для перевода потока в режим блокирования. Заметьте, что функция block_func() получает на вход результат работы функции context_alloc(). После того как функция block_func() разблокируется, она возвращает указатель на контекст, который библиотека передает функции handler_func(). Функция handler_func() отвечает за выполнение «работы» — например, в типовом варианте именно она обрабатывает сообщение от клиента. На данный момент принято, что функция handler_func() должна возвращать нуль — ненулевые значения зарезервированы QSSL для будущего функционального расширения. Функция unblock_func() также в настоящее время зарезервирована, поэтому просто оставьте там NULL.
Возможно, ситуацию немного прояснит приведенный ниже пример псевдокода (он основан все на том же рисунке «Жизненный цикл потока в пуле потоков»):
FOREVER DO
IF (#threads < lo_water) THEN
IF (#threads < maximum) THEN
create new thread
context = (*context_alloc)(handle);
ENDIF
ENDIF
retval = (*block_func)(context);
(*handler_func)(retval);
IF (#threads > hi_water) THEN
(*context_free)(context)
kill thread
ENDIF
DONE
Отметим, что приведенная выше программа излишне упрощена. Ее назначение состоит только в том, чтобы продемонстрировать вам поток данных по параметрам ctp и handler и дать вам некоторое представление об алгоритмах, которые обычно применяются для управления числом потоков.
До настоящего момента мы обсуждали дисциплины диспетчеризации и состояния потоков, но практически ничего не сказали относительно того, почему и когда происходит собственно перепланирование. Существует распространенное заблуждение, что перепланирование «просто случается», безо всяких реальных причин. И в общем-то, для проектирования это довольно полезная абстракция! Однако, очень важно понимать, почему происходит перепланирование. Вспомним рисунок «Схема алгоритма диспетчеризации» (в разделе «Роль ядра»).
Перепланирование может иметь только три причины:
• аппаратное прерывание;
• системный вызов;
• сбой (исключение).
Перепланирование из-за аппаратного прерывания можно разделить на две категории:
• по прерыванию от таймеров;
• по прерыванию от других аппаратных средств.
Часы реального времени генерируют периодические прерывания для ядра, организуя перепланирование во времени.
Например, если вы производите вызов
sleep(10)
, часы реального времени сгенерируют некоторое число прерываний; по каждому прерыванию ядро увеличивает значение системных часов. Когда системные часы покажут, что 10 секунд истекли, ядро перепланирует ваш поток, переведя его в состояние готовности (READY). (Мы рассмотрим этот вопрос более подробно в главе «Часы, таймеры и периодические уведомления»).
Другие потоки могут ожидать аппаратные прерывания от внешних устройств, таких как последовательный порт, жесткий диск или аудио платы. В этом случае они блокируются в ядре, ожидающем аппаратное прерывание. Поток будет переупорядочен ядром только после того, как ядро сгенерирует «событие».
Если поток делает системный вызов, перепланирование выполняется немедленно и может рассматриваться как асинхронное в отношении прерываний таймера и других прерываний.
Например, выше мы приводили пример вызова функции
sleep(10)
. Это библиотечная функция языка Си, в конечном счете она транслируется в системный вызов. В тот же самый момент ядро приняло решение о перепланировании, чтобы удалить ваш поток из очереди готовности по соответствующему приоритету и поставить на выполнение другой поток, находящийся в состоянии готовности (READY).
Системных вызов, вызывающи процесс обязательного перепланирования, очень много. Большинство их них достаточно очевидны. Перечислим некоторые из них:
• функции таймера (например, sleep());
• функции обмена сообщениями (например, MsgSendv());
• примитивы работы с потоками (например, pthread_cancel() или pthread_join()).
Последняя из вышеперечисленных причин перепланирования — это сбой процессора (CPU fault), который является исключительной ситуацией (exception) — чем-то средним между аппаратным прерыванием и системным вызовом. Исключительные ситуации асинхронны в отношении ядра (подобно прерыванию), но синхронны с вызывающими их пользовательскими программами (подобно вызову ядра — например, такая исключительная ситуация как деление на ноль). Все рассуждения, относящиеся к перепланированию по прерываниям от аппаратных средств и по системным вызовам, относятся и к исключительным ситуациям тоже.
Операционная система QNX/Neutrino предлагает богатые возможности диспетчеризации потоков — минимальных диспетчеризуемых единиц. Процесс в QNX/Neutrino определяется как минимальная единица, способная обладать ресурсами (например, областями памяти), и может содержать один или более потоков.
С потоками можно применять любые из следующих методов синхронизации:
• мутексы (mutexes) — владеть мутексом в заданный момент времени может только один поток;
• семафоры (semaphores) — владеть семафором позволяется некоторому фиксированному числу потоков;
• ждущие блокировки (sleepons) — позволяют нескольким потокам блокироваться на нескольких объектах, динамически назначая блокированным потокам соответствующие условные переменные;
• условные переменные (condvars) — подобны ждущим блокировкам, за исключением того, что за распределение условных переменных отвечает программист;
• присоединение (joining) — обеспечивает синхронизацию потока по отношению к завершению другого потока;
• барьеры (barriers) — позволяют потокам ждать, пока определенное число потоков не встретится в определенной точке.
Отметим, что мутексы, семафоры и условные переменные могут использоваться между потоками как в том же самом, так и в разных процессах, ждущие же блокировки могут применяться только между потоками одного и того же процесса (потому что системный мутекс библиотеки ждущих блокировок «скрыт» в адресном пространстве процесса).
Наряду с синхронизацией, потоки можно диспетчеризовать (используя приоритеты и различные дисциплины диспетчеризации), и они автоматически могут выполняться как в однопроцессорном блоке, так и в системе с архитектурой SMP.
Всякий раз, когда мы говорим о «создании процесса» (обычно как о средстве переноса однопоточного кода), мы действительно создаем адресное пространство с одним работающим в нем потоком — этот поток стартует по вызову функции main() или функций atfork() или vfork(), в зависимости от реализации.