Глава 2 Обмен сообщениями

Введение в обмен сообщениями

В данной главе мы рассмотрим наиболее характерную отличительную особенность QNX/Neutrino — механизм обмена сообщениями. Обмен сообщениями в QNX/Neutrino — ключевой механизм, глубоко интегрированный с микроядерной архитектурой этой операционной системы и обеспечивающий ей ее модульность.

Микроядро и обмен сообщениями

Одним из основных преимуществ QNX/Neutrino является то, что данная операционная система является масштабируемой. Под «масштабируемостью» здесь подразумевается, что данная система может быть адаптирована к работе как в крошечных встраиваемых системах с ограниченными ресурсами, так и в больших сетях симметричных многопроцессорных систем (SMP), т.е. систем, ресурсы которых практически неограничены.

В операционной системе QNX/Neutrino такой уровень универсальности достигается разнесением различных сервисов по отдельным модулям. Таким образом, вы имеете возможность включить в конечную систему только те компоненты, которые вам действительно необходимы. Используя многопоточность, вы также упрощаете своему проекту «масштабируемость вверх» для использования его в SMP-системах (в данной главе мы рассмотрим еще ряд полезных применений для потоков, которые мы не обсуждали ранее).

Эта концепция была изначально заложена во все ОС семейства QNX и соблюдается по сей день. Основным принципом построения этих систем является микроядерная архитектура, когда модули, которые в традиционной операционной системе были бы включены в состав ядра, рассматриваются как необязательные компоненты.

Модульная архитектура QNX/Neutrino.

Какие из модулей применить в проекте — это уже решать проектировщику, то есть вам. В вашем проекте необходима файловая система? Если да, включите ее в проект. Если нет — можете не включать. Вам необходим драйвер последовательного порта? Каков бы ни был ваш ответ, он не повлияет на предыдущее решение касательно файловой системы, равно как и не будет от него зависеть.

В процессе работы системы вы также имеете возможность изменять ее состав. Вы можете динамически удалять любые компоненты из работающей системы или добавлять их, в любой произвольный момент времени. Вы спросите, существуют ли какие-либо ограничения относительно «драйверов»? Нет, не существуют — драйвер в QNX/Neutrino является стандартной пользовательской программой, которая разве что выполняет определенные действия с оборудованием. Мы обсудим, как писать такие программы, в главе «Администраторы ресурсов».

Ключом к реализации всего этого является обмен сообщениями. Вместо встраивания модулей ОС непосредственно в ядро и обеспечения между ними некоего «специального» взаимодействия, в QNX/Neutrino модули общаются друг с другом посредством обмена сообщениями. Ядро в основном отвечает только за служебные функции на уровне потоков (например, за диспетчеризацию потоков). На самом деле, обмен сообщениями используется не только для трюков с динамической инсталляцией и деинсталляцией компонентов — на нем основаны большинство всех остальных служебных функций (например, распределение памяти выполняется путем отправки специализированного сообщения администратору процессов). Впрочем, конечно, некоторые сервисы реализуются непосредственно через системные вызовы.

Рассмотрим открытие файла и запись в него блока данных. Это реализуется с помощью ряда сообщений, посылаемых приложением такому опциональному компоненту ОС как файловая система. Сообщение указывает файловой системе открыть файл, затем другое сообщение указывает ей записать в него некие данные. И не волнуйтесь — обмен сообщениями в QNX/Neutrino происходит очень быстро.

Обмен сообщениями и модель «клиент/сервер»

Представьте, что приложение читает данные из файловой системы. На языке QNX это значит, что данное приложение — это клиент, запрашивающий данные у сервера.

Модель «клиент/сервер» позволяет говорить о нескольких рабочих состояниях процессов, связанных с обменом сообщениями (мы говорили о них в главе «Процессы и потоки»). Первоначально сервер ждет от кого-нибудь сообщение. В этот момент сервер, как говорят, должен быть в состоянии блокировки по приему (recieve-blocked) (оно также может обозначаться как RECV). Ниже приведен пример вывода программы

pidin
:

pid tid name    prio  STATE  Blocked

4  1  devc-pty  10r  RECEIVE    1

В приведенном примере сервер псевдотерминалов (называемый

devc-pty
) имеет идентификатор процесса 4, содержит один поток с идентификатором потока 1, выполняется с приоритетом 10, подчиняется диспетчеризации карусельного типа (RR) и находится в состоянии блокировки по приему, ожидая сообщения по каналу с идентификатором 1 (к «каналам» мы еще скоро вернемся).

Смена состояний сервера.

По получении сообщения сервер переходит в состояние готовности (READY) и становится способен выполнять работу. Если оказывается, что из всех процессов, находящихся в данный момент в состоянии готовности (READY), наш сервер имеет наивысший приоритет, он получит процессор и сможет выполнить какие-то действия. Поскольку это сервер, он анализирует поступившее сообщение и решает, что с ним делать. В некоторый момент времени сервер завершит обработку сообщения и затем «ответит» клиенту.

Перейдем теперь к клиенту. Изначально клиент работал самостоятельно, пока не решил послать сообщение. Клиент переключается при этом из состояния готовности (READY) в состояние либо блокировки по передаче (send-blocked), либо блокировки по приему (recieve-blocked), в зависимости от состояния сервера, которому было послано сообщение.

Смена состояний клиента

Скорее всего, вам чаще придется иметь дело с состоянием блокировки по приему (reply-blocked), чем с состоянием блокировки по передаче (send-blocked). Состояние блокировки по приему (reply-blocked) реально означает следующее:

Сервер принял сообщение и теперь обрабатывает его. В некоторый момент времени сервер завершит обработку и ответит клиенту. Клиент блокирован в ожидании этого ответа от сервера.

Сравните с состоянием блокировки по передаче (send-blocked):

Сервер все еще не принял сообщение — вероятно, потому что был занят обработкой другого, ранее поступившего сообщения. Когда сервер возвратится в состояние «приема» вашего (клиентского) сообщения, вы перейдете из состояния блокировки по передаче (send-blocked) в состояние блокировки по ответу (reply- blocked).

На практике, если вы наблюдаете процесс, блокированный по передаче (send-blocked), это означает одно из двух:

1. Вы запечатлели момент, когда сервер был занят обслуживанием некоего клиента и в это время получил еще один запрос.

Это нормальная ситуация — вы можете проверить это, повторно выполнив

pidin
. На сей раз вы, вероятно, сможете увидеть, что этот процесс уже более не блокирован по передаче.

2. В сервере проявилась какая-то внутренняя ошибка, и он больше не воспринимает запросы.

Когда это произойдет, вы сможете увидеть множество процессов, блокированных по передаче на этом сервере. Чтобы проверить это, выполните

pidin
снова и посмотрите, есть ли изменения в состоянии клиентских процессов.

Ниже приведен пример, в котором показан клиент в состоянии блокировки по ответу (reply-blocked) и сервер, по которому он блокирован:

  pid tid name         prio STATE  Blocked

   1  1 /nto/x86/sys/procnto  0f READY

   1  2 /nto/x86/sys/procnto  10r RECEIVE    1

   1  3 /nto/x86/sys/procnto  10r NANOSLEEP

   1  4 /nto/x86/sys/procnto  10r RUNNING

   1  5 /nto/x86/sys/procnto  15r RECEIVE    1

16426  1 esh          10r REPLY     1

В примере показано, что программа

esh
(встраиваемый командный интерпретатор) передала сообщение процессу с номером 1 (это ядро и администратор процессов,
procnto
) и теперь ждет ответа.

Ну вот, теперь вы знаете основы обмена сообщениями в архитектуре «клиент/сервер».

Не исключено, что вы сейчас думаете: «Так что, получается, чтобы открыть файл или записать данные, мне придется писать специализированные вызовы обмена сообщениями QNX/ Neutrino?!»

Нет, вам не придется программировать обмен сообщениями непосредственно — разве что если вам будет нужно копнуть совсем вглубь (об этом несколько позже). Действительно, позвольте мне показать Вам некоторую программу клиента, который делает передачу сообщений:

#include 

#include 


int main(void) {

 int fd;

 fd = open("filename", O_WRONLY);

 write(fd, "Это обмен сообщениями\n", 24);

 close(fd);

 return (EXIT_SUCCESS);

}

Видите? Обычная Си-программа, никаких хитростей.

Собственно обмен сообщениями реализован в Си-библиотеке QNX/Neutrino. Вы просто выдаете вызовы по стандарту POSIX 1003.1 и вызовы функций ANSI Си и Си-библиотека делает за Вас всю работу, связанную с обменом сообщениями.

В приведенном выше примере вызываются три функции, и посылаются три различных сообщения:

open() — передала сообщение «open» («открыть»);

write() — передала сообщение «write» («записать»);

close() — передала сообщение «close» («закрыть»).

Мы обсудим сами сообщения более подробно, когда мы будем изучать администраторы ресурсов (в главе «Администраторы ресурсов»), а пока что единственное, что нам надо знать об этом — это сам факт, что были переданы сообщения различных типов.

Давайте на мгновение отвлечемся и сравним этот подход с тем, как бы это работало в традиционной операционной системе.

Клиентская программа осталась бы такой же — различия были бы скрыты в Си-библиотеке, поставляемой производителем программного обеспечения. В такой системе функция open() сделала бы системный вызов, ядро затем обратилось бы непосредственно к файловой системе, которая, в свою очередь, выполнила некоторые действия и возвратила бы дескриптор файла. Вызовы функций write() и close() работали бы аналогично

Итак? Есть ли преимущества в способе, который предлагает QNX/Neutrino? «Оставайтесь с нами!»

Распределенный обмен сообщениями

Предположим, что мы пожелали изменить приведенный выше пример, чтобы можно было «поговорить» с другим узлом сети. Вы, наверное, думаете, что для этого придется вызывать специальные функции, чтобы «попасть в сеть». Вот сетевой вариант нашей программы:

#include 

#include 


int main(void) {

 int fd;

 fd = open("/net/wintermute/home/rk/filename", O_WRONLY);

 write(fd, "Это обмен сообщениями\n", 24);

 close(fd);

 return (EXIT_SUCCESS);

}

Вы будете правы, если скажете, что в обеих версиях программы почти одинаковы. Так и есть.

В традиционной ОС функция open() библиотеки Си вызывает ядро, которое анализирует имя файла и говорит: «Опа! Это не на нашем узле…» Ядро затем вызывает сетевую файловую систему NFS, которая уже определяет, где в действительности находится файл

/net/wintermute/home/rk/filename
. Затем, NFS вызывает сетевой драйвер и посылает сообщение ядру на узле
wintermute
, которое повторяет весь процесс, описанный нами в нашем первоначальном примере. Заметьте, что в этом случае оказываются вовлеченными две файловые системы, одна из которых — сетевая файловая система (NFS) клиента, а вторая — удаленная. К сожалению, в зависимости от реализации как удаленной файловой системы, так и NFS, некоторые операции (например, блокировки файлов) могут работать некорректно из-за неполной совместимости.

В QNX/Neutrino функция open() Си-библиотеки создает точно такое же сообщение, какое она создала бы для локальной файловой системы, и посылает его файловой системе узла

wintermute
. Локальная и удаленная файловые системы при этом абсолютно одинаковы.

Это и есть еще одна фундаментальная особенность QNX/Neutrino: распределенные операции выполняются в ней абсолютно «непринужденно», поскольку потребности клиентов изначально абстрагированы от служебных функций, обеспечиваемых серверами, благодаря механизму обмена сообщениями.

В традиционном ядре действует «двойной стандарт», когда локальные сервисы реализуются одним способом, а удаленные (сетевые) — совершенно другим.

Что это означает для вас

Обмен сообщениями в QNX/Neutrino элегантно реализован и распределен по сети. И что? Что с этого нам, программистам?

Ну, в первую очередь это означает, что ваши программы унаследуют от ОС те же характеристики — сделать их распределенными будет гораздо проще, чем в других ОС. Но самое полезное, на мой взгляд, преимущество заключается в том, что эта схема обеспечивает модульную структуру программного обеспечения, тем самым значительно упрощая процесс отладки и тестирования.

Вам, вероятно, доводилось участвовать в больших проектах, когда множество людей разрабатывают различные фрагменты целевого программного обеспечения. Разумеется, при этом кто-то опережает график, а кто-то запаздывает.

У таких проектов проблемы чаще всего возникают на двух этапах: на первоначальном, при распределении отдельных частей проекта между конкретными исполнителями, а также на этапе тестирования и интеграции, когда невозможно провести комплексные испытания системы из-за недоступности всех необходимых компонентов.

С использованием принципа обмена сообщениями развязать друг от друга отдельные компоненты проекта становится очень просто, что ведет к значительному упрощению как самого проекта, так и технологии тестирования. Если говорить об этом в терминах существующих парадигм, данный подход очень похож на концепции, применяемые в объектно-ориентированном программировании (ООП).

К чему все это сводится? К тому, что тестирование можно выполнять поэтапно. Вы сможете написать простенькую программку, которая посылает сообщения вашему серверному процессу, а поскольку его входы и выходы являются (или должны быть!) хорошо задокументированными, то вы сможете сразу определить, работает он или нет. Черт возьми, можно даже создать типовые тестовые наборы, включить их в комплект для регрессивного тестирования и выполнять его в автоматическом режиме!

Философия QNX/Neutrino

Принципы обмена сообщениями лежат в самой основе философии QNX/Neutrino. Понимание смысла и приемов применения обмена сообщениями будет ключом к наиболее эффективному использованию ОС. Прежде чем углубиться в детали, давайте рассмотрим немного теории.

Обмен сообщениями и многопоточность

При том, что модель «клиент/сервер» проста для понимания и очень широко используется, существуют две вариации на данную тему. Первая — многопоточная реализация (об этом речь в данной главе), вторая — так называемая модель «сервер/субсервер», иногда полезная и в обычных разработках, но в полной мере раскрывающая свои преимущества при проектировании распределенных систем. Сочетание этих двух концепций предоставляет колоссальную мощь, особенно в сетях симметричных мультипроцессорных систем!

Как мы уже обсуждали в главе «Процессы и потоки», QNX/Neutrino позволяет реализовать множество потоков в одном и том же процессе. Какие преимущества это нам даст в сочетании с механизмом обмена сообщениями?

Ответ здесь довольно прост. Мы можем стартовать пул потоков (используя функции семейства thread_pool_*(), о которых мы говорили в разделе «Процессы и потоки»), каждый из которых сможет обрабатывать сообщения от клиентов:

Обслуживание клиентов различными потоками сервера

С этой точки зрения нам абсолютно все равно, которому именно потоку достанется на обработку отправленное клиентом сообщение — главное, чтобы работа была выполнена. Это имеет ряд преимуществ. Способность обслуживать нескольких клиентов отдельными потоками обработки является очень мощной концепцией по сравнению с применением лишь одного потока. Главное преимущество состоит в том, что задача распараллеливания обработки перелагается на ядро, избавляя сам сервер от необходимости реализовывать параллельную обработку самостоятельно.

Когда несколько потоков работают на машине с единственным процессором, это значит, что все эти потоки будут конкурировать друг с другом за процессорное время.

Однако, в SMP-блоке мы можем сделать так, чтобы потоки конкурировали не за один, а за несколько процессоров, используя при этом одну и ту же общую область данных. Это означает, что здесь мы будем ограничены исключительно числом доступных процессоров.

Модель «сервер/субсервер»

Давайте теперь рассмотрим модель «сервер/субсервер», а затем наложим ее на модель многопоточности.

В соответствием с моделью «сервер/субсервер», сервер по- прежнему обеспечивает обслуживание клиентуры, но поскольку обслуживание запросов может занимать слишком много времени, мы должны быть способны поставить запрос на обработку и при этом не потерять способность обрабатывать новые запросы, продолжающие поступать от других клиентов.

Если бы мы попытались реализовать эту задачу с применением традиционной однопоточной модели «клиент/сервер», то после получения одного запроса и начала его обработки мы потеряли бы способность воспринимать другие запросы — нам приходилось бы периодически прекращать обработку, проверять, есть ли еще запросы, помещать таковые (если они есть) в очередь заданий и затем продолжать обработку, уже распыляя внимание на обработку всевозможных заданий, находящихся в очереди. Не очень-то эффективно. Фактически мы здесь дублируем работу ядра путем реализации дополнительного квантования времени между заданиями!

Представьте себе, каково вам было бы делать все это самому. Вы сидите за своим рабочим столом в офисе, и кто-то приносит вам полную папку заданий на выполнение. Вы начинаете их выполнять. Вы по уши заняты работой, и тут в дверном проеме появляется кто-то еще, с еще одним списком не менее первоочередных заданий. Теперь у вас на рабочем столе имеете два списка неотложных дел, и вы продолжаете работать, стараясь выполнить все. Вы тратите несколько минут на работы из одного списка, потом переключаете внимание на другой, и так далее, периодически посматривая на дверной проем, на случай если кто- нибудь принесет еще.

В такой ситуации было бы гораздо разумнее как раз применить модель «сервер/субсервер». В соответствии с этой моделью, у нас есть сервер, который создает ряд других процессов (субсерверов). Каждый из этих субсерверов посылает сообщение серверу, но сервер не отвечает им, пока не получит запрос от клиента. Затем сервер передает запрос клиента одному из субсерверов, отвечая на его сообщение заданием, которое субсервер обязан выполнить. Этот процесс показан на приведенном ниже рисунке. Отметьте для себя направления стрелок — они соответствуют направлениям передачи сообщений!

Модель «сервер/субсервер».

Если бы вы делали что-то подобное, вы бы, скорее всего, наняли дополнительно несколько служащих. Эти служащие все пришли бы к вам (как субсерверы посылают сообщение серверу — отсюда направление стрелок на рисунке) в поисках, чего бы такого сделать. Первоначально у вас могло и не быть для них никакой работы — в таком случае их запросы остались бы без ответа. Но теперь, когда кто-нибудь принесет вам кипу бумаг, вы скажете одному из ваших подчиненных: «Это тебе!» — и подчиненный пойдет заниматься делом. Аналогично, по мере поступления других заданий вы и далее будете делегировать их остальным подчиненным.

Хитрость этой модели заключается в том, что она является управляемой по ответу (reply-driven) — выполнение задания начинается с вашего ответа (reply) субсерверу. Стандартная же модель «клиент/сервер» является управляемой по запросу (send-driven), поскольку работа начинается с передачи сообщения серверу.

Но почему клиенты приходят именно к вам в офис, а не в офисы нанятых вами работников? Почему именно вы распределяете работу? Ответ довольно прост: вы — координатор, ответственный за определенную задачу, и ваша обязанность — гарантировать ее выполнение. Ваши клиенты, заказывающие вам работу, знают Вас, но не знают ни имен, ни местонахождения ваших (возможно, временных) работников.

Как вы, вероятно, и подозревали, концепцию единого многопоточного сервера и модель «сервер/субсервер» можно комбинировать. Главная хитрость при этом будет в определении того, какие части задачи лучше было бы распределить по машинам в сети (обычно это касается компонентов системы, которые не генерируют большого трафика), а какие — по процессорам SMP-архитектур (чаще всего это элементы, требующие наличия разделяемой области данных).

Зачем можно было бы комбинировать эти два метода? Используя подход «сервер/субсервер», мы сможем распределять работу между узлами сети. Это в действительности означает, что мы ограничены только числом доступных в сети машин (ну, и полосой пропускания сети, разумеется).

Объединение этот подход с принципом распределения потоков по различным процессорам в архитектурах SMP, мы получим «вычислительный кластер», где центральный «арбитр» распределяет работу (в модели «сервер/субсервер») между блоками SMP, объединенными в сеть.

Несколько примеров

Рассмотрим теперь несколько примеров применения каждого метода.

Режим с управлением по запросу (send-driven) — модель «клиент/сервер»

Файловая система, последовательные порты, консоли и звуковые платы — все это примеры применения модели «клиент/сервер». Прикладная программа на языке Си берет на себя роль клиента и посылает запросы этим серверам. Серверы выполняют работу и отвечают клиентам.

Некоторые из этих «обычных» серверов, однако, в действительности могут быть серверами, управляемыми по ответу (reply-driven)! Это возможно, например, в случае, когда по отношению к конечному клиенту они выглядят как стандартные серверы, а вот работу выполняют по методике «сервер/ субсервер». То есть я имею в виду, что клиент по-прежнему посылает сообщение тому, кого считает «серверным процессом», а тот просто передает работу другому процессу (субсерверу).

Режим с управлением по ответу (reply-driven) — модель «сервер/субсервер»

Один из наиболее популярных примеров программы, управляемой по ответу (reply-driven), — это программа фрактальной графики, распределенная по сети. Ведущая программа делит экран на несколько зон — например, на 64 зоны. При старте ведущей программе задается список узлов, которые могут участвовать в работе. Затем ведущая программа запускает рабочие программы (субсерверы), по одной на каждый узел, и ждет от них сообщений.

Затем ведущая программа по очереди берет «незаполненные» зоны (из имеющихся 64) и передает задачу фрактальных вычислений программе-исполнителю на другом узле, отвечая ей на ее сообщение. Когда рабочая программа завершит вычисления, она посылает результаты обратно ведущей, которая выводит их на экран.

Поскольку программа-исполнитель передала результаты ведущей программе путем отправки ей сообщения, она теперь снова готова получить от нее ответ с новым заданием. Ведущая программа так и делает до тех пор, пока все 64 зоны на экране не будут заполнены.

Важная тонкость

Поскольку ведущая программа отвечает за распределение работы между программами-исполнителям, она не может себе позволить быть заблокированной! При традиционном подходе с управлением по запросу (send-driven) ведущая программа должна была бы создать программу-исполнителя и послать ей сообщение. К сожалению, при этом ведущая программа не сможет получить ответ до тех пор, пока программа-исполнитель не выполнит свою работу, а значит, не сможет и передать сообщение другой программе-исполнителю. Это сразу сводит на нет все преимущества наличия нескольких рабочих программ на разных узлах.

Один ведущий, несколько исполнителей

Решение этой проблемы заключается в том, чтобы исполнители при старте запросили ведущего, есть ли для них работа, послав ему сообщение. Напомним еще раз, что направление стрелок на рисунке указывает направление передачи. Теперь исполнители ждут ответа от ведущего. Когда какой-нибудь клиент «заказывает работу» ведущему, тот отвечает одному или более из исполнителей, что указывает им выйти из ожидания и начать выполнение. Это позволяет исполнителям заботиться о своих делах самостоятельно, а ведущий сохраняет возможность отвечать на новые запросы, поскольку не блокируется в ожидании ответа от исполнителей.

Многопоточный сервер

С позиции клиента многопоточные серверы неотличимы от однопоточных. Фактически, разработчик сервера может запросто «включить многопоточность», запустив еще один или несколько потоков.

В любом случае, сервер может по-прежнему использовать несколько процессоров в SMP-системе, даже если «клиент» у него только один.

Что это означает? Давайте вернемся к примеру о фрактальной графике. Когда субсервер получает от сервера запрос на «вычисления», ему ничто не мешает запустить несколько потоков и начать обработку данного запроса на нескольких процессорах сразу. На самом деле, чтобы приложение лучше масштабировалось в сетях, в которых есть как мультипроцессоры SMP, так и однопроцессорные машины, сервер и субсервер могут сначала обменяться информацией о том, сколько у субсервера имеется в распоряжении процессоров. Это даст серверу знать, сколько запросов субсервер может обслужить одновременно. Тогда сервер сможет перенаправлять многопроцессорным субсерверам больше запросов, чем однопроцессорным, равномерно распределяя нагрузку между вычислительными мощностями.

Применение обмена сообщениями

Теперь, когда мы рассмотрели базовые концепции обмена сообщениями и выяснили, что он используется даже в таких обычных повседневных вещах как библиотека языка Си, давайте рассмотрим кое-какие детали.

Архитектура и структура

Мы рассуждали о «клиентах» и «серверах». Я также использовал три ключевые выражения:

• «Клиент посылает (sends) сообщение серверу»;

• «Сервер принимает (receives) сообщение от клиента»;

• «Сервер отвечает (replies) клиенту».

Я преднамеренно использовал именно эти выражения, потому что они в точности соответствуют действительным именам функций, которые используются для передачи сообщений в QNX/Neutrino.

Ниже приводится (в алфавитном порядке) полный список функций QNX/Neutrino, относящихся к обмену сообщениями:

ChannelCreate(), ChannelDestroy();

ConnectAttach(), ConnectDetach();

MsgDeliverEvent();

MsgError();

MsgRead(), MsgReadv();

MsgRecieve(), MsgRecievePulse(), MsgRecievev();

MsgReply(), MsgReplyv();

MsgSend(), MsgSendc(), MsgSendsv(), MsgSendsvnc(), MsgSendvs(), MsgSendvsnc(), MsgSendv(), MsgSendvnc();

MsgWrite(), MsgWritev().

Пусть вас не приводит в замешательство размер списка функций. Вы запросто сможете писать приложения «клиент/сервер», используя лишь небольшое подмножество этого списка, просто по мере углубления в детали вы поймете, что некоторые из вышеперечисленных функций могут оказаться очень полезными в определенных случаях.

Минимальный полезный набор функций включает в себя функции ChannelCreate(), ConnectAttach(), MsgReply(), MsgSend() и MsgRecieve().

Разобьем обсуждение на две части: отдельно обсудим функции, которые применяются на стороне клиента, и отдельно — те, что применяются на стороне сервера.

Клиент

Клиент, который желает послать запрос серверу, блокируется до тех пор, пока сервер не завершит обработку запроса. Затем, после завершения сервером обработки запроса, клиент разблокируется, чтобы принять «ответ».

Это подразумевает обеспечение двух условий: клиент должен «уметь» сначала установить соединение с сервером, а потом обмениваться с ним данными с помощью сообщений — как в одну сторону (запрос — «send»), так и в другую (ответ — «reply»).

Установление соединения

Итак, рассмотрим теперь функции по порядку. Первое, что мы должны сделать — это установить соединение. Это мы сделаем с помощью функции ConnectAttach(), описанной следующим образом:

#include 


int ConnectAttach(int nd, pid_t pid, int chid,

 unsigned index, int flags);

Функции ConnectAttach() передаются три идентификатора: идентификатор nd — дескриптор узла (Node Descriptor), идентификатор pid — идентификатор процесса (process ID) и идентификатор chid — идентификатор канала (channel ID).

Вместе эти три идентификатора, которые обычно записываются в виде «ND/PID/CHID», однозначно идентифицируют сервер, с которым клиент желает соединиться. Аргументы index и flags мы здесь просто проигнорируем (установим их в ноль).

Итак, предположим, что мы хотим подсоединиться к процессу, находящемуся на нашем узле и имеющего идентификатор 77, по каналу с идентификатором 1. Ниже приведен пример программы для выполнения этого:

int coid;

coid = ConnectAttach(0, 77, 1, 0, 0);

Можно видеть, что присвоением идентификатору узла (nd) нулевого значения мы сообщаем ядру о том, что мы желаем установить соединение на локальном узле.

Как я узнал, что соединиться надо с процессом 77 и по каналу 1? К этому мы скоро вернемся (см. ниже «Поиск сервера по ND/PID/CHID»).

С этого момента у меня есть идентификатор соединения — небольшое целое число, которое однозначно идентифицирует соединение моего клиента с конкретным сервером по заданному каналу.

Я смогу применять этот идентификатор для отправки запросов серверу сколько угодно раз. Выполнив все, для чего предназначалось соединение, я смогу уничтожить его с помощью функции:

ConnectDetach(coid);

Итак, давайте рассмотрим, как я воспользуюсь этим на практике.

Передача сообщений (sending)

Передача сообщения со стороны клиента осуществляется применением какой-либо функции из семейства MsgSend*().

Мы рассмотрим это на примере простейшей из них — MsgSend():

#include 


int MsgSend(int coid, const void *smsg, int sbytes,

 void *rmsg, int rbytes);

Аргументами функции MsgSend() являются :

• идентификатор соединения с целевым сервером (coid);

• указатель на передаваемое сообщение (smsg);

• размер передаваемого сообщения (sbytes);

• указатель на буфер для ответного сообщения (rmsg);

• размер ответного сообщения (rbytes);

Что может быть проще!

Передадим сообщение процессу с идентификатором 77 по каналу 1:

#include 


char *smsg = "Это буфер вывода";

char rmsg[200];

int coid;


// Установить соединение

coid = ConnectAttach(0, 77, 1, 0, 0);

if (coid == -1) {

 fprintf(stderr, "Ошибка ConnectAttach к 0/77/1!\n");

 perror(NULL);

 exit(EXIT_FAILURE);

}


// Послать сообщение

if (MsgSend(

 coid, smsg, strlen(smsg) + 1, rmsg, sizeof(rmsg)) == -1) {

 fprintf (stderr, "Ошибка MsgSend\n");

 perror(NULL);

 exit (EXIT_FAILURE);

}

if (strlen(rmsg) > 0) {

 printf("Процесс с ID 77 возвратил \"%s\"\n", rmsg);

}

Предположим, что процесс с идентификатором 77 был действительно активным сервером, ожидающим сообщение именно такого формата по каналу с идентификатором 1. После приема сообщения сервер обрабатывает его и в некоторый момент времени выдает ответ с результатами обработки. В этот момент функция MsgSend() должна возвратить ноль (0), указывая этим, что все прошло успешно. Если бы сервер послал нам в ответ какие-то данные, мы смогли бы вывести их на экран с помощью последней строки в программе (с тем предположением, что обратно мы получаем корректную ASCIIZ-строку).

Сервер

Теперь, когда мы рассмотрели клиента, перейдем к серверу. Клиент использовал функцию ConnectAttach() для создания соединения с сервером, а затем использовал функцию MsgSend() для передачи сообщений.

Создание канала

Под этим подразумевается, что сервер должен создать канал — то, к чему присоединялся клиент, когда вызывал функцию ConnectAttach(). Обычно сервер, однажды создав канал, приберегает его «впрок».

Канал создается с помощью функции ChannelCreate() и уничтожается с помощью функции ChannelDestroy():

#include 


int ChannelCreate(unsigned flags);

int ChannelDestroy(int chid);

Мы еще вернемся к обсуждению аргумента flags (в разделе «Флаги каналов», см. ниже), а покамест будем использовать для него значение 0 (ноль). Таким образом, для создания канала сервер должен сделать так:

int chid;

chid = ChannelCreate(0);

Теперь у нас есть канал. В этом пункте клиенты могут подсоединиться (с помощью функции ConnectAttach()) к этому каналу и начать передачу сообщений:

Связь между каналом сервера и клиентским соединением.

Обработка сообщений

В терминах обмена сообщениями, сервер отрабатывает схему обмена в два этапа — этап «приема» (receive) и этап «ответа» (reply).

Взаимосвязь функций клиента и сервера при обмене сообщениями.

Обсудим сначала два простейших варианта соответствующих функций, MsgReceive() и MsgReply(), а далее посмотрим, какие есть варианты.

#include 


int MsgReceive(int chid, void *rmsg, int rbytes,

 struct _msg_info *info);

int MsgReply(int rcvid, int status, const void *msg,

 int nbytes);

Посмотрим, как соотносятся параметры:

Поток данных при обмене сообщениями.

Как видно из рисунка, имеются четыре элемента, которые мы должны обсудить:

1. Клиент вызывает функцию MsgSend() и указывает ей на буфер передачи (указателем smsg и длиной sbytes). Данные передаются в буфер функции MsgReceive() на стороне сервера, по адресу rmsg и длиной rbytes. Клиент блокируется.

2. Функция MsgReceive() сервера разблокируется и возвращает идентификатор отправителя rcvid, который будет впоследствии использован для ответа. Теперь сервер может использовать полученные от клиента данные.

3. Сервер завершил обработку сообщения и теперь использует идентификатор отправителя rcvid, полученный от функции MsgReceive(), передавая его функции MsgReply(). Заметьте, что местоположение данных для передачи функции MsgReply() задается как указатель на буфер (smsg) определенного размера (sbytes). Ядро передает данные клиенту.

4. Наконец, ядро передает параметр sts, который используется функцией MsgSend() клиента как возвращаемое значение. После этого клиент разблокируется.

Вы, возможно, заметили, что для каждой буферной передачи указываются два размера (в случае запроса от клиента клиента это sbytes на стороне клиента и rbytes на стороне сервера; в случае ответа сервера это sbytes на стороне сервера и rbytes на стороне клиента). Это сделано для того, чтобы разработчики каждого компонента смогли определить размеры своих буферов — из соображений дополнительной безопасности.

В нашем примере размер буфера функции MsgSend() совпадал с длиной строки сообщения. Давайте теперь рассмотрим, что происходит в сервере и как размер используется там.

Структура сервера

Вот общая структура сервера:

#include 


...


void server(void) {

 int rcvid; // Указывает, кому надо отвечать

 int chid; // Идентификатор канала

 char message[512]; // Достаточно велик

 // Создать канал

 chid = ChannelCreate(0);

 // Выполняться вечно — для сервера это обычное дело

 while (1) {

  // Получить и вывести сообщение

  rcvid = MsgReceive(chid, message, sizeof(message), NULL);

  printf("Получил сообщение, rcvid %X\n", rcvid);

  printf("Сообщение такое: \"%s\".\n", message);

  // Подготовить ответ — используем тот же буфер

  strcpy(message, "Это ответ");

  MsgReply(rcvid, EOK, message, sizeof(message));

 }

}

Как видно из программы, функция MsgReceive() сообщает ядру о том, что она может обрабатывать сообщения размером вплоть до

sizeof(message)
(или 512 байт). Наш клиент (представленный выше) передал только 28 байт (длина строки). На приведенном ниже рисунке это и показано:

Передача меньшего объема данных, чем предполагается.

Ядро реально передает минимум из двух указанных размеров. В нашем случае ядро передало бы 28 байт, сервер бы разблокировался и отобразил сообщение клиента. Оставшиеся 484 байта (из буфера длиной 512 байт) остались бы нетронутыми.

Аналогичная ситуация с функцией MsgReply(). Функция MsgReply() информирует, что собирается передать 512 байт, но функция MsgSend() определила, что может принять максимум 200 байт. Ядро опять передает минимум. В этом случае 200 байтов, которые клиент может принять, ограничивают размер передачи. (Один интересный аспект здесь состоит в том, что когда сервер передаст данные, то если клиент не примет их целиком, как в нашем примере, их уже нельзя будет вернуть — они будут потеряны.).

Имейте в виду, что такое «урезание» является стандартным и ожидаемым поведением.

Когда мы будем обсуждать обмен сообщениями по сети, вы увидите, что в количестве передаваемых данных есть кое-какое «ага». Мы проанализируем это далее в разделе «Особенности обмена сообщениями в сети».

Иерархический принцип обмена (send-иерархия)

В обмене сообщениями есть одна вещь, которая, возможно, не является очевидной — это необходимость следовать строгой иерархии обмена. Означает это то, что два потока никогда не должны посылать сообщения друг другу; наоборот, они должны быть организованы так, что каждый поток занимал свой «уровень иерархии», и все потоки данного уровня должны посылать сообщения только потокам более низкого уровня, а не своего или высшего. Проблема с наличием двух потоков, которые посылают сообщения друг другу, заключается в том, что в конечном счете вы столкнетесь с проблемой взаимной блокировки (deadlock), когда оба потока ожидают друг от друга ответ на соответствующие сообщения. Поскольку эти потоки блокированы, то они никогда не будут поставлены на выполнение, а значит, не смогут дать друг другу ответ, и вы в результате получите два (а то и более!) зависших потока.

Способ назначения потокам уровней иерархии заключается в том, чтобы разместить наиболее удаленную клиентуру на самом верхнем уровне и работать оттуда. Например, если у вас есть графический интерфейс пользователя, который использует некоторый сервер баз данных, который, в свою очередь, использует файловую систему, а файловая система использует блок-ориентированный драйвер файловой системы, то у вас получается естественная иерархия процессов.

Передачи (sends) при обмене сообщениями будут направлены от клиента (графического интерфейса пользователя) вниз к серверам нижнего уровня; ответы на сообщения (replies) будут иметь встречное направление.

При том, что это работает в большинстве случаев, вы можете столкнуться и с ситуацией, когда вам придется нарушить иерархию обмена. Это никогда не следует выполнять простым нарушением иерархии, направляя сообщения «против течения» — для этого существует функция MsgDeliverEvent(), о которой речь несколько позже.

Идентификаторы отправителя, каналы и другие параметры

Мы с вами пока не обсуждали различные параметры, используемые в ранее рассмотренных примерах, чтобы можно было сконцентрировать внимание на самих принципах обмена сообщениями. Теперь поговорим об этих параметрах более подробно.

Дополнительно о каналах

В приведенном выше примере с сервером мы видели, что сервер создал один-единственный канал. Конечно, можно было создать больше, но обычно серверы так не делают. (Наиболее очевидный пример сервера с двумя каналами — это администратор штатной сети

qnet
— вот уж определенно эксцентричный образец программного обеспечения !)

Оказывается, что в действительности нет большой необходимости в создании нескольких каналов. Главное назначение канала состоит в том, чтобы четко указать серверу, где «слушать» на предмет входящих сообщений, и четко указать клиентам, куда передавать сообщения (через соответствующие соединения). Единственная ситуация, когда вам могло бы понадобиться использовать несколько каналов в сервере, — это если бы хотели реализовать сервер, предоставляющий различные услуги (или различные классы услуг) в зависимости от того, по какому каналу было принято сообщение. Второй канал мог бы применяться, например, для отправки сообщений типа «импульс», пробуждающих субсерверы — это гарантировало бы развязку этого сервиса от служебных функций, предоставляемых обычными сообщениями по первому каналу.

В предыдущем параграфе я утверждал, что вы могли бы использовать в сервере пул потоков, готовый принимать сообщения от клиентов, и что реально не имеет значения, который именно из потоков в пуле получит запрос. Это еще один аспект «канальной абстракции». В предыдущих версиях QNX (особенно в QNX4), клиент мог передать сообщение серверу, определяя его идентификатором узла (node ID) и идентификатором процесса (process ID) на этом узле. Поскольку QNX4 — однопоточная ОС, никакого беспорядка с тем, кому передается сообщение, в ней быть не могло. Однако, стоит ввести понятие потока, и встает дополнительная проблема адресации потоков в процессе (ведь именно потоки собственно предоставляют сервисы). Поскольку поток — вещь преходящая, в действительности для клиента не имеет смысла подключаться к четко определенному потоку в четко определенном процессе на четко определенном узле. К тому же, а что если нужный поток занят? Мы тогда должны были бы обеспечить клиенту возможность выбрать «незанятый поток из некоторого пула потоков, предоставляющих нужный сервис».

Так вот, для этого и существуют каналы. Канал — это «адрес» некоторого «пула потоков, предоставляющих нужный сервис». Суть здесь заключается в том, что вызвать функцию MsgReceive() по одному и тому же каналу могут несколько потоков одновременно. Все они будут блокированы, но входящее сообщение будет передано только одному из них.

Кто послал сообщение?

Довольно часто серверу необходимо знать, кто послал ему сообщение. Для этого есть ряд причин, например:

• учет клиентов;

• управление доступом;

• определение контекстных связей;

• выбор типа сервиса;

• и т.д.

Сделать так, чтобы клиент передавал серверу эту информацию с каждым сообщением, было бы излишне громоздким (да и давало бы лишние лазейки в системе защиты). Поэтому существует специальная структура, заполняемая ядром всякий раз, когда функция MsgReceive() разблокируется, приняв сообщение. Эта структура имеет тип

struct _msg_info
и содержит в себе следующее:

struct _msg_info {

 int nd;

 int srcnd;

 pid_t pid;

 int32_t chid;

 int32_t scoid;

 int32_t coid;

 int32_t msglen;

 int32_t tid;

 int16_t priority;

 int16_t flags;

 int32_t srcmsglen;

};

Вы передаете все это функции MsgReceive() в качестве последнего параметра. Если вы передаете NULL, то не произойдет ничего. (Информацию все равно можно будет потом получить с помощью вызова функции MsgInfo() — она не теряется!)

Давайте взглянем на поля этой структуры:

nd, srcnd, pid и tid Это дескриптор узла, идентификатор процесса и идентификатор потока клиента. (Заметьте, что nd — это дескриптор принимающего узла для режима передачи, a srcnd — это дескриптор передающего узла для режима приема. Для этого имеется очень серьезное основание ;-), которое мы рассмотрим ниже в разделе «Несколько замечаний о дескрипторах узлов»).
priority Приоритет потока, пославшего сообщение.
chid, coid Идентификатор канала, по которому сообщение было передано, и идентификатор использованного при этом соединения.
scoid Идентификатор соединения с сервером. Это внутренний идентификатор, который применяется ядром для маршрутизации сообщения от сервера назад к клиенту Вам не нужно ничего знать об этом идентификаторе, кроме одного любопытно факта, что это будет небольшое целое число, которое уникально идентифицирует клиента.
flags Содержит различные битовые флаги: _NTO_MI_ENDIAN_BIG, _NTO_MI_ENDIAN_DIFF, _NTO_MI_NET_CRED_DIRTY и _NTO_MI_UNBLOCK_REQ. Биты _NTO_MI_ENDIAN_BIG и _NTO_MI_ENDIAN_DIFF сообщат вам о порядке байт в слове для отправившей сообщение машины (в случае, если сообщение пришло через сеть от машины с другим порядком байт), бит _NTO_MI_NET_CRED_DIRTY зарезервирован для внутреннего использования, значение бита _NTO_MI_UNBLOCK_REQ мы рассмотрим в разделе «Использование бита _NTO_MI_UNBLOCK_REQ», см. ниже.
msglen Число принятых байт.
srcmsglen Длина исходного сообщения в байтах, как оно было отправлено клиентом. Это число может превышать значение msglen — например, в случае приема меньшего количества данных, чем было послано. Заметьте, что это поле действительно только в том случае, если установлен бит _NTO_CHF_SENDER_LEN в переданном функции ChannelCreate() (для канала, по которому было получено данное сообщение) параметре flags.
Идентификатор отправителя (receive ID), он же клиентский жетон (client cookie)

В примере программы, представленном выше, отметьте следующее:

rcvid = MsgReceive(...);

...

MsgReply(rcvid, ...);

Это — ключевой фрагмент, потому что именно в нем иллюстрируется привязка приема сообщения от клиента к последующему ответу этому конкретному клиенту. Идентификатор отправителя — это целое число, которое действует как жетон («magic cookie»), который вы получаете от клиента и обязаны хранить, если вы желаете впоследствии взаимодействовать с этим клиентом. Что произойдет, если вы его потеряете? Его больше нет. Функция MsgSend() клиента не разблокируется, пока вы (конкретный сервер) живы, или пока не произошел тайм-аут обмена сообщениями (и даже в этом случае все не так просто; см. функцию TimerTimeout() в справочном руководстве по библиотеке Си и обсуждение о применения в главе «Часы, таймеры и периодические уведомления», раздел «Тайм-ауты ядра»).

Не пытайтесь извлечь из значения идентификатора отправителя какой-либо конкретный смысл — он может измениться в будущих версиях операционной системы. Единственное, что нужно знать — что он уникален, то есть у вас никогда не будет двух различных клиентов с одним и тем же идентификатором отправителя (иначе ядро просто не сможет их различить, когда вы вызовете MsgReply()).

Отметим также, что за исключением одного частного случая (с применением функции MsgDeliverEvent(), которую мы рассмотрим позже), после вызова функции MsgReply() соответствующий идентификатор отправителя перестает иметь смысл.

Таким образом, мы плавно переходим к функции MsgReply().

Ответ клиенту

Функция MsgReply() принимает в качестве параметров идентификатор отправителя, код возврата, указатель на сообщение и размер этого сообщения. Мы только что обсудили идентификатор отправителя — он уникально идентифицирует того, кому должно быть отправлено ответное сообщение. Код возврата указывает, какой код должна возвратить функция MsgSend() клиента. Наконец, указатель на сообщение и размер указывают на местоположение и размер (необязательного!) ответного сообщения, которое следует отправить.

Функция MsgReply() может показаться очень простой (и так оно и есть), но рассмотреть ее применение было бы полезно.

А можно и не отвечать

Однако, вы вовсе не обязаны обязательно ответить клиенту перед приемом новых сообщений от других клиентов с помощью функции MsgReceive()! Это положение можно с успехом использовать в множестве различных сценариев.

В типовом драйвере устройства клиент может выдать запросом, который не будет обслужен в течение продолжительного времени. Например, клиент может запросить драйвер аналого-цифрового преобразователя (АЦП): «Сходи-ка принеси мне данные за следующие 45 секунд.» Драйвер АЦП не может себе позволить вывесить табличку «Закрыто» на целых 45 секунд, потому что другим клиентам тоже может срочно что-нибудь понадобиться — например, данные по другому каналу, информация о состоянии, и т.п.

В соответствии со своей архитектурой, драйвер АЦП просто поставит в очередь полученный от функции MsgReceive() идентификатор отправителя, осуществит запуск 45-секундного процесса накопления данных и снова вернется к обработке клиентских запросов. По истечении этого 45-секундного интервала, когда данные накоплены, драйвер АЦП сможет найти идентификатор отправителя, связанный с данным запросом, и ответить нужному клиенту.

Вам также может понадобиться задержаться с ответом клиенту в случае модели «сервер/субсервер» (то есть некоторые клиенты — на самом деле субсерверы). Вы можете просто запомнить идентификаторы ищущих работу субсерверов и сохранить их до поры до времени. Когда работа для субсерверов появится, тогда и только тогда вы ответите субсерверу, указав, что именно он должен сделать.

Ответ без данных или с кодом ошибки (errno)

Когда дело наконец доходит до ответа клиенту, вы совершенно не обязаны передавать ему какие-либо данные. Это может использоваться в двух случаях.

Вы можете отправить клиенту ответ без данных, если единственная цель ответа — разблокировать клиента. Скажем, клиент желает быть блокированным до некоторого события, а до какого именно — ему знать не обязательно. В этом случае функции MsgReply() не потребуется никаких данных, достаточно будет только идентификатора отправителя:

MsgReply(rcvid, EOK, NULL, 0);

Такой вызов разблокирует клиента (но не передаст ему никаких данных) и возвратит код EOK («успешное завершение»).

Как вариант, вы можете при желании возвратить клиенту код ошибки. Вы не сможете сделать это с помощью функции MsgReply(), вместо нее для этого используется функция MsgError():

MsgError(rcvid, EROFS);

В приведенном выше примере сервер обнаруживает, что клиент пытается записать данные в файловую систему, предназначенную только для чтения, и вместо данных возвращает клиенту код ошибки (errno) EROFS.

Еще одним поводом ответить клиенту без данных (и соответствующие вызовы мы вскоре рассмотрим) может быть то, что данные уже переданы ранее (с помощью функции MsgWrite()), и больше никаких данных нет.

Почему применяются два типа вызовов? Они немного различны. В то время как обе функции MsgError() и MsgReply() разблокируют клиента, функция MsgError() при этом не передаст никаких данных, заставит функцию MsgSend() клиента возвратить -1 и установит переменную errno на стороне клиента в значение, переданное функции MsgError() в качестве второго аргумента.

С другой стороны, функция MsgReply() может передавать данные (как видно из ее третьего и четвертого параметров) и заставляет функцию MsgSend() клиента возвратить значение, переданное MsgReply() в качестве второго аргумента. Переменная errno клиента остается нетронутой.

В общем случае, если вам нужно только сообщить о результатах действия («прошло/не прошло»), лучше применять функцию MsgError(). Если бы вы возвращали данные, здесь была бы необходима функция MsgReply(). Обычно, когда вы возвращаете данные, вторым параметром функции MsgReply() будет положительное целое число, указывающее на число возвращаемых байт.

Определение идентификаторов узла, процесса и канала (ND/PID/CHID) нужного сервера

Ранее мы отметили, что для соединения с сервером функции ConnectAttach() необходимо указать дескриптор узла (Node Descriptor — ND), идентификатор процесса (process ID — PID), а также идентификатор канала (Channel ID — CHID). До настоящего момента мы не обсуждали, как именно клиент находит эту информацию.

Если один процесс создает другой процесс, тогда это просто — вызов создания процесса возвращает идентификатор вновь созданного процесса. Создающий процесс может либо передать собственные PID и CHID вновь созданному процессу в командной строке, либо вновь созданный процесс может вызвать функцию getppid() для получения идентификатора родительского процесса, и использовать некоторый «известный» идентификатор канала.

А что если у нас два совершенно чужих процесса? Это возможно, например, в том случае, если сервер создан некоей третьей стороной, а вашему приложению нужно уметь общаться с этим сервером. Реально мы должны найти ответ на вопрос: «Как сервер объявляет о своем местонахождении?»

Существует множество способов сделать это; мы рассмотрим только три из них, в порядке возрастания «элегантности»:

1. Открыть файла с известным именем и сохранить в нем ND/PID/CHID. Такой метод является традиционным для серверов UNIX, когда сервер открывает файл (например,

/etc/httpd.pid
), записывает туда свой идентификатор процесса в виде строки ASCII и предполагают, что клиенты откроют этот файл прочитают из него идентификатор.

2. Использовать для объявления идентификаторов ND/PID/CHID глобальные переменные. Такой способ обычно применяется в многопоточных серверах, которые могут посылать сообщение сами себе. Этот вариант по самой своей природе является очень редким.

3. Занять часть пространства имен путей и стать администратором ресурсов. Мы поговорим об этом в главе «Администраторы ресурсов».

Первый подход относительно прост, но он чреват «загрязнением файловой системы», когда в каталоге

/etc
лежит куча файлов
*.pid
. Поскольку файлы устойчивы (имеется в виду, что они выживают после смерти создающего их процесса и перезагрузки машины), очевидного способа стереть эти файлы не существует — разве что использовать этакую программную «старуху с косой», постоянно проверяющую, не пора ли прибрать кого-то из них.

Имеется и другая связанная с этим подходом проблема. Поскольку процесс, который создал файл, может умереть, не удалив этот файл, то вы не сможете узнать, жив ли еще этот процесс, пока не попробуете передать ему сообщение. И это ещё не самое страшное — еще хуже, если комбинация ND/PID/CHID указанная в файле, оказывается настолько старой, что может быть повторно использована другой программой! Получив «чужое» сообщение, эта программа в лучшем случае его проигнорирует его, а ведь может и предпринять некорректные действия. Так что такой подход исключается.

Второй подход, где мы используем глобальные переменные для объявления значений ND/PID/CHID, не является общим решением проблемы, поскольку в нем предполагается способность клиента обратиться к этим глобальным переменным. А поскольку для этого требуется использование разделяемой памяти, это не будет работать в сети! Так что этот метод обычно используется либо в небольших тестовых программах, либо в очень специфичных случаях, но всегда в контексте многопоточной программы.

Что реально происходит, так это то, что один поток в программе является клиентом, а другой поток — сервером. Поток-сервер создает канал и затем размещает идентификатор канала в глобальной переменной (идентификаторы узла и процесса являются одинаковыми для всех потоков в процессе, так что объявлять их не обязательно). Поток-клиент затем берет этот идентификатор канала и выполняет по нему функцию ConnectAttach().

Третий подход — сделать сервер администратором ресурса — является определенно самым прозрачным и поэтому рекомендуемым общим решением. Механизм того, как это делается, изложен в главе «Администраторы ресурсов», а пока все, что вы должны об этом знать — это то, что сервер регистрирует некое имя пути как свою «область ответственности», а клиенты обращаются к нему обычным вызовом функции open().

Не сочту лишним подчеркнуть:

Файловые дескрипторы POSIX в QNX/Neutrino реализованы через идентификаторы соединений, то есть дескриптор файла уже является идентификатором соединения! Органичность этой схемы в том, что поскольку дескриптор файла, возвращаемый функцией open(), фактически является идентификатором соединения, клиенту не нужно выполнять какие-либо дополнительные действия, чтобы использовать это соединение. Например, когда клиент после вызова open() вызывает функцию read(), передавая ей полученный дескриптор, это с минимальными накладными расходами транслируется в функцию MsgSend().

А что насчет приоритетов?

А что произойдет, если сообщение серверу передадут одновременно два процесса с разными приоритетами?

Сообщения всегда доставляются в порядке приоритетов.

Если два процесса посылают сообщения «одновременно», первым доставляется сообщение от процесса с высшим приоритетом.

Если оба процесса имеют одинаковый приоритет, то сообщения будут доставлены в порядке отправки (поскольку в машине с одним процессором не бывает ничего одновременного, и даже в SMP-блоке будет присутствовать некий порядок, поскольку процессоры будут конкурировать между собой за доступ к ядру).

Мы еще вернемся к анализу других тонкостей этой проблемы чуть позже в этой главе, когда будем говорить о проблеме инверсии приоритетов.

Чтение и запись данных

До настоящего времени мы обсуждали основные примитивы обмена сообщениями. Как я и упоминал ранее, это минимум, который необходимо знать. Однако существует еще несколько дополнительных функций, которые делают нашу жизнь значительно проще.

Рассмотрим пример, в котором для обеспечения обмена сообщениями между клиентом и сервером нам понадобились бы и другие функции.

Клиент вызывает MsgSend() для передачи неких данных серверу. После вызова MsgSend() клиент блокируется. Теперь он ждет, чтобы сервер ему ответил.

Интересные события разворачиваются на стороне сервера. Сервер вызывает функцию MsgReceive() для приема сообщения от клиента. В зависимости от того, как вы спроектировали вашу систему сообщений, сервер может знать, а может и не знать, насколько велико сообщение клиента. Как сервер может не знать, каков реальный размер сообщения? Возьмем наш пример с файловой системой. Предположим, что клиент делает так:

write(fd, buf, 16);

Это сработает так, как и ожидается, если сервер вызовет MsgReceive() с размером буфера, скажем, 1024 байта. Так как наш клиент послал небольшое сообщение (28 байт), никаких проблем не будет.

А что если клиент отправит сообщение, превышающее по размеру 1024 байт — скажем, 1 мегабайт? Например, так:

write(fd, buf, 1000000);

Как сервер мог бы обработать это сообщение поизящнее? Мы могли, к примеру, сказать, что клиенту не позволяется записывать более чем n байт. Тогда функции write() в клиентской Си-библиотеке пришлось бы разбивать каждый «длинный» запрос на несколько запросов по n байт каждый. Неуклюже.

Другая проблема в этом примере заключается в вопросе «А каково должно быть n

Как вы видите, этот подход имеет следующие основные недостатки:

• Все функции, которые применяются для обмена сообщениями ограниченного размера, должны быть модифицированы в Си- библиотеке так, чтобы функция передавала запросы в виде серии пакетов. Это само по себе немалый объем работы. Также это может иметь ряд неожиданных побочных эффектов при работе в мнопоточной среде — что если первая часть сообщения от одного потока передана, и тут его вытесняет другой поток клиента и посылает свое собственное сообщение. Что будет с прерванным потоком тогда?

• Все серверы должны быть готовы к обработке сообщения максимально возможного размера. Это означает, что все серверы должны будут иметь значительные области данных, или Си-библиотека будет должна разделять большие запросы на несколько меньших, ухудшая тем самым быстродействие.

К счастью, эта проблема довольно просто обходится, причем даже с дополнительным выигрышем.

Здесь будут особенно полезны функции MsgRead() и MsgWrite(). Важно при этом помнить, что клиент блокирован — это означает, что он не собирается изменять данные, пока сервер их анализирует.

В многопоточном клиенте теоретически возможно, что в область данных заблокированного по серверу клиентского потока залезет другой поток. Такая ситуация рассматривается как некорректная (ошибка проектирования), поскольку серверный поток предполагает, что он имеет монопольный доступ к области данных клиента, пока тот заблокирован.

Функция MsgRead() описана так:

#include 


int MsgRead(int rcvid, void *msg, int nbytes, int offset);

Функция MsgRead() позволяет Вашему серверу считать nbytes байт данных из адресного пространства заблокированного клиента, начиная со смещения offset от начала клиентского буфера, в буфер, указанный параметром msg. Сервер не блокируется, а клиент не разблокируется. Функция MsgRead() возвращает число байтов, которые были фактически считаны, или возвращает -1, если произошла ошибка.

Итак, давайте подумаем, как бы мы использовали эти возможности в нашем примере с вызовом write(). Библиотечная функция write() создает сообщение с заголовком и посылает его серверу файловой системы

fs-qnx4
. Сервер принимает небольшую часть сообщения с помощью MsgReceive(), анализирует его и принимает решение, где разместить остальную часть сообщения — например, где-то в уже выделенном буфере дискового кэша.

Давайте рассмотрим пример.

Пример отправки сообщения серверу fs-qnx4 с непрерывным представлением данных.

Итак, клиент решил переслать файловой системе 4Кб данных. (Отметьте для себя, что Си-библиотека добавила к сообщению перед данными небольшой заголовок — чтобы потом можно было узнать, к какому типу принадлежал этот запрос. Мы еще вернемся к этому вопросу, когда будем говорить о составных сообщениях, а также — еще более детально — когда будем анализировать работу администраторов ресурсов.) Файловая система считывает только те данные (заголовок), которые будут ей необходимы для того, чтобы выяснить тип принятого сообщения:

// Часть заголовков, вымышлены для примера

struct _io_write {

 uint16_t type;

 uint16_t combine_len;

 int32_t nbytes;

 uint32_t xtype;

};


typedef union {

 uint16_t type;

 struct _io_read io_read;

 struct _io_write io_write;

 ...

} header_t;


header_t header; // Объявить заголовок


rcvid = MsgReceive(chid, &header, sizeof(header), NULL);

switch (header.type) {

 ...

case _IO_WRITE:

 number_of_bytes = header.io_write.nbytes;

 ...

Теперь сервер

fs-qnx4
знает, что в адресном пространстве клиента находится 4Кб данных (сообщение известило его об этом через элемент структуры nbytes), и что эти данные надо передать в буфер кэша. Теперь сервер
fs-qnx4
может сделать так:

MsgRead(rcvid, cache_buffer[index].data,

 cache_buffer[index].size, sizeof(header.io_write));

Обратите внимание, что операции приема сообщения задано смещение

sizeof(header.io_write)
— это сделано для того, чтобы пропустить заголовок, добавленный клиентской библиотекой. Мы предполагаем здесь, что
cache_buffer[index].size
(размер буфера кэша) равен 4096 (или более) байт.

Для записи данных в адресное пространство клиента есть аналогичная функция:

#include 


int MsgWrite(int rcvid, const void *msg, int nbytes,

 int offset);

Применение функции MsgWrite() позволяет серверу записать данные в адресное пространство клиента, начиная со смещения offset байт от начала указанного клиентом приемного буфера. Эта функция наиболее полезна в случаях, где сервер ограничен в ресурсах, а клиент желает получить от него значительное количество информации.

Например, в системе сбора данных клиент может выделить 4-мегабайтный буфер и приказать драйверу собрать 4 мегабайта данных. Драйверу вовсе не обязательно держать под боком здоровенный буфер просто так, на случай если кто-то вдруг неожиданно запросит передачу большого массива данных.

Драйвер может иметь буфер размером 128Кб для обмена с аппаратурой посредством DMA, а сообщение пересылать в адресное пространство клиента по частям, используя функцию MsgWrite() (разумеется, каждый раз увеличивая смещение на 128Кб). Когда будет передан последний фрагмент, можно будет вызывать MsgReply().

Передача нескольких фрагментов сообщения с помощью функции MsgWrite()

Отметим, что функция MsgWrite() позволяет вам записать различные компоненты данных в различные места, а затем либо просто разбудить клиента вызовом MsgReply():

MsgReply(rcvid, EOK, NULL, 0);

либо сделать это после записи заголовка в начало клиентского буфера:

MsgReply(rcvid, EOK, &header, sizeof(header));

Это довольно изящный трюк для записи неизвестного количества данных, когда вы узнаете, сколько данных нужно было записать, только когда запись уже закончена. Главное — если вы будете использовать второй метод, с записью заголовка после записи данных, не забудьте зарезервировать место под заголовок в начале клиентского буфера!

Составные сообщения

До сих пор мы демонстрировали только обмен сообщениями, когда данные передаются из одного буфера в адресном пространстве клиента в другой буфер в адресном пространстве сервера (и наоборот — в случае ответа на сообщение).

При том, что данный подход вполне приемлем для большинства приложений, его применение далеко не всегда эффективно. Вспомните: наша функция write() из Си-библиотеки берет переданный ей буфер и добавляет в его начало небольшой заголовок. Используя то, что мы уже изучили ранее, вы могли бы ожидать, что реализация write() в Си-библиотеке может выглядеть примерно так (это не реальный код!):

ssize_t write(int fd, const void *buf, size_t nbytes) {

 char *newbuf;

 io_write_t *wptr;

 int nwritten;

 newbuf = malloc(nbytes + sizeof(io_write_t));

 // Заполнить write_header

 wptr = (io_write_t*)newbuf;

 wptr->type = _IO_WRITE;

 wptr->nbytes = nbytes;

 // Сохранить данные от клиента

 memcpy(newbuf + sizeof(io_write_t), buf, nbytes);

 // Отправить сообщение серверу

 nwritten =

  MsgSend(fd, newbuf, nbytes + sizeof(io_write_t),

 newbuf, sizeof(io_write_t));

 free(newbuf);

 return(nwritten);

}

Понимаете, что произошло? Несколько неприятных вещей:

• Функция write() теперь должна быть способна выделить память под буфер достаточно большого размера как для данных клиента (которые могут быть довольно значительными по объему), так и для заголовка. Размер заголовка не имеет значения — в этом случае он был равен 12 байтам.

• Мы были должны скопировать данные дважды: в первый раз — при использовании функции memcpy(), и затем еще раз, снова — уже при осуществлении передачи сообщения.

• Мы должны были предусмотреть указатель на тип

io_write_t
и установить его на начало буфера, вместо использования обычных механизмов доступа (впрочем, это незначительный недостаток).

Поскольку ядро намерено копировать данные в любом случае, было бы хорошо, если бы мы смогли сообщить ему о том, что одна часть данных (заголовок) фиксирована по некоторому адресу, а другая часть (собственно данные) фиксирована где- нибудь еще, без необходимости самим вручную собирать буферы из частей и копировать данные.

На наше счастье, в QNX/Neutrino реализован механизм, который позволяет нам сделать именно так! Механизм этот называется IOV (i/o vector), или «вектор ввода/вывода».

Давайте для начала рассмотрим некоторую программу, а затем обсудим, что происходит с применением такого вектора.

#include 


ssize_t write(int fd, const void *buf, size_t nbytes) {

 io_write_t whdr;

 iov_t iov[2];

 // Установить IOV на обе части:

 SETIOV(iov + 0, &whdr, sizeof(whdr));

 SETIOV(iov + 1, buf, nbytes);

 // Заполнить io_write_t

 whdr.type = _IO_WRITE;

 whdr.nbytes = nbytes;

 // Отправить сообщение серверу

 return (MsgSendv(coid, iov, 2, iov, 1));

}

Прежде всего, обратите внимание на то, что не применяется никакой функции malloc() и никакой функции memcpy(). Затем обратим внимание на тип применяемого вектора IOV —

iov_t
. Это структура, которая содержит два элемента — адрес и длину. Мы определили массив из двух таких структур и назвали его iov.

Определение типа вектора

iov_t
содержится в
и выглядит так:

typedef struct iovec {

 void *iov_base;

 size_t iov_len;

} iov_t;

Мы заполняем в этой структуре пары «адрес — длина» для заголовка операции записи (первая часть) и для данных клиента (вторая часть). Существует удобная макрокоманда, SETIOV(), которая выполняет за нас необходимые присвоения. Она формально определена следующим образом:

#include 


#define SETIOV(_iov, _addr, _len) \

 ((_iov)->iov_base = (void *)(_addr), \

 (_iov)->iov_len = (_len))

Макрос SETIOV() принимает вектор

iov_t
, а также адрес и данные о длине, которые подлежат записи в вектор IOV.

Также отметим, что как только мы создаем IOV для указания на заголовок, мы сможем выделить стек для заголовка без использования malloc(). Это может быть и хорошо, и плохо — это хорошо, когда заголовок невелик, потому что вы хотите исключить головные боли, связанные с динамическим распределением памяти, но это может быть плохо, когда заголовок очень велик, потому что тогда он займет слишком много стекового пространства. Впрочем, заголовки обычно невелики.

В любом случае, вся важная работа выполняется функцией MsgSendv(), которая принимает почти те же самые аргументы, что и функция MsgSend(), которую мы использовали в предыдущем примере:

#include 


int MsgSendv(int coid, const iov_t *siov, int sparts,

 const iov_t *riov, int rparts);

Давайте посмотрим на ее аргументы:

coid Идентификатор соединения, по которому мы передаем — как и при использовании функции MsgSend().
sparts и rparts Число пересылаемых и принимаемых частей, указанных параметрами вектора
iov_t
; в нашем примере мы присваиваем аргументу sparts значение 2, указывая этим, что пересылаем сообщение из двух частей, а аргументу rparts — значение 1, указывая этим, что мы принимаем ответ из одной части.
siov и riov Эти массивы значений типа
iov_t
указывают на пары «адрес — длина», которые мы желаем переслать. В вышеупомянутом примере мы выделяем siov из двух частей, указывая ими на заголовок и данные клиента, и riov из одной части, указывая им только на заголовок.

Как ядро видит составное сообщение.

Ядро просто прозрачно копирует данные из каждой части вектора IOV из адресного пространства клиента в адресное пространство сервера (и обратно, при ответе на сообщение). Фактически, при этом ядро выполняет операцию фрагментации/дефрагментации сообщения (scatter/gather).

Несколько моментов, которые необходимо запомнить:

• Число фрагментов ограничено значением 231 (больше, чем вам придется использовать!); число 2 в нашем примере — типовое значение.

• Ядро просто копирует данные, указанные вектором IOV, из одного адресного пространства в другое.

Вектор-источник и вектор-приемник не должны совпадать.

Почему последний пункт так важен? Для того чтобы ответить, рассмотрим все подробнее. Со стороны клиента, скажем, мы выдали:

write(fd, buf, 12000);

в результате чего был создан вектор IOV из двух частей:

• заголовок (12 байт);

• данные (12000 байт);

На стороне сервера (скажем, сервера файловой системы

fs-qnx4
) мы имеем блоки памяти кэша до 4Кб каждый, и мы хотели бы эффективно принять сообщение непосредственно в эти блоки. В идеале мы бы написали что-то типа:

// Настроить структуру IOV для приема:

SETIOV(iov + 0, &header, sizeof(header.io_write));

SETIOV(iov + 1, &cache_buffer[37], 4096);

SETIOV(iov + 2, &cache_buffer[16], 4096);

SETIOV(iov + 3, &cache_buffer[22], 4096);

rcvid = MsgReceivev(chid, iov, 4, NULL);

Эта программа делает в значительной степени то, что вы и предполагаете: она задает вектор IOV из 4 частей, первая из которых указывает на заголовок, а следующие три части — на блоки кэш-памяти с номерами 37, 16 и 22. (Предположим, что именно эти блоки случайно оказались доступными в данный момент.) Ниже это иллюстрируется графически.

Распределение непрерывных данных по отдельным буферам.

Затем осуществляется вызов функции MsgReceivev(), и ей указывается, что мы намерены принять сообщение по указанному каналу (параметр chid), и что вектор IOV для этой операции состоит из 4 частей.

(Кроме возможности работать с векторами IOV, функция MsgReceivev() действует аналогично функции MsgReceive().)

Опа! Мы сделали ту же самую ошибку, которую уже делали к раньше, когда знакомились с функцией MsgReceive(). Как мы узнаем, сообщение какого типа мы собираемся принять и сколько в нем данных, пока не примем все сообщение целиком?

Мы сможем решить эту проблему тем же способом, что и прежде:

rcvid = MsgReceive(chid, &header, sizeof(header), NULL);

switch (header.message_type) {

 ...

case _IO_WRITE:

 number_of_bytes = header.io_write.nbytes;

 // Выделить / найти элемент кэша

 // Заполнить элементами кэша 3-элементный IOV

 MsgReadv(rcvid, iov, 3, sizeof(header.io_write));

Здесь мы вызываем «предварительную» MsgReceive() (отметьте, что тут мы не используем ее векторную форму, поскольку для сообщения, состоящего из одной части, в ней просто нет необходимости), определяем тип сообщения и затем продолжаем считывать данные из адресного пространства клиента (начиная со смещения

sizeof(header.io_write)
) в кэш-буферы, определенные трехэлементным вектором IOV.

Обратите внимание, что мы перешли от вектора IOV, состоящего из 4 частей (как в первом примере), к вектору IOV из 3 частей. Дело в том, что в первом примере первый из четырех элементов вектора IOV отводился под заголовок, который на этот раз мы считали непосредственно при помощи функции MsgReceive(), а последние три элемента аналогичны трехэлементному вектору из второго примера — они определяют место, куда мы хотим записать данные.

Можно представить, как мы ответили бы на запрос чтения:

1. Найти элементы кэша, которые соответствуют запрашиваемым данным.

2. Заполнить вектора IOV ссылками на них.

3. Применить функцию MsgWritev() (или MsgReplyv()) для передачи данных клиенту.

Отметим, что если данные начинаются не непосредственно с начала блока кэша (или другой структуры данных), то в этом нет никакой проблемы. Просто сместите первый вектор IOV на точку начала данных и соответственно откорректируйте поле размера.

Как насчет других версий?

Все функции обмена сообщениями, кроме функций семейства MsgSend*(), имеют одинаковую общую форму: если имя функции имеет суффикс «v», значит, она принимает в качестве аргументов вектор IOV и число его частей; в противном случае, она принимает указатель и длину.

Семейство MsgSend*() содержит четыре основных варианта реализации функций с точки зрения буферов источника и адресата, плюс два варианта собственно системного вызова — итого восемь.

В нижеприведенной таблице сведены данные о вариантах функций семейства MsgSend*().

Функция Буфер передачи Буфер приема
MsgSend() линейный линейный
MsgSendnc() линейный линейный
MsgSendsv() линейный IOV
MsgSendsvnc() линейный IOV
MsgSendvs() IOV линейный
MsgSendvsnc() IOV линейный
MsgSendv() IOV IOV
MsgSendvnc() IOV IOV

Под линейным буфером я подразумеваю, что передается единый буфер типа

void*
вместе с его длиной. Это легко запомнить: суффикс «v» означает «вектор», и он находится на том же самом месте, что и соответствующий параметр — первым или вторым, в зависимости от того, какой буфер — передачи или приема — объявляется векторным.

Хмм. Получается, что функции MsgSendsv() и MsgSendsvnc() идентичны? Да, по части параметров именно так оно и есть. Различие заключается в том, является функция точкой завершения (cancellation point) или нет. Версии с суффиксом «nc» («no cancellation» — прим. ред.) не являются точками завершения, в то время как версии без этого суффикса — являются. (Дополнительную информацию относительно точек завершения и завершаемости (cancelability) вообще можно найти в справочном руководстве по Си-библиотеке в главе, посвященной pthread_cancel().)

Реализация

Вероятно, вы уже подозревали, что все варианты функций MsgRead(), MsgReceive(), MsgSend() и функций MsgWrite() тесно связаны между собой. (Единственное исключение — функция MsgReceivePulse(); мы ее вкратце рассмотрим.)

Которые из этих функций следует применять? В общем-то вопрос этот является чисто философским. Что до меня лично, то я предпочитаю комбинировать.

Если мы посылаем или принимаем только одноэлементные сообщения, то зачем нам все эти проблемы с настройкой векторов IOV?

Накладные расходы (кстати, незначительные) по загрузке процессора обычно не зависят от того, настраиваете ли вы все сами или оставляете это ядру или библиотеке. Подход с использованием одноэлементных сообщений избавляет ядро от необходимости манипуляций с адресным пространством и поэтому работает несколько быстрее.

Следует ли вам применять функции, использующие IOV? Конечно! Используйте их всегда, когда вам приходится самостоятельно программировать обмен многоэлементными сообщениями. Никогда непосредственно не копируйте данные при передаче многоэлементных сообщений, даже если для этого потребуется всего несколько строк программы. Это перегрузит систему попытками минимизировать число реальных операций копирования данных туда-сюда; передача указателей происходит намного быстрее, чем копирование данных из буфера в буфер.

Сообщения типа «импульс» (pulse)

Все сообщения, которые мы обсуждали до настоящего времени, блокируют клиента. Как только клиент вызывает функцию MsgSend(), для него наступает тихий час. Клиент отдыхает до тех пор, пока сервер не ответит на сообщение.

Однако есть ситуации, где отправитель сообщения не может себе позволить блокироваться. Мы рассмотрим некоторые из них в главах «Прерывания» и «Часы, таймеры и периодические уведомления», а сейчас мы должны понять концепцию данной проблемы.

Механизм, который обеспечивает отправку сообщения без блокирования, называют «импульсом» (pulse). Импульс — это миниатюрное сообщение, которое:

• может перенести 40 бит полезной информации (8-битный код и 32 бита данных);

• является неблокирующим для отправителя;

• может быть получено точно так же, как и сообщение другого типа;

• ставится в очередь, если получатель не заблокирован в ожидании сообщения.

Прием импульса

Прием импульса выполняется очень просто: короткое, четко определенное сообщение передается функции MsgReceive(), как будто поток отправил обычное стандартное сообщение. Единственное различие состоит в том, что вы не сможете применить функцию MsgReply() к такому сообщению, поскольку, кроме всего прочего, общая идея импульса состоит в том, что это сообщение по своей сути является асинхронным. В данном разделе мы рассмотрим другую функцию, MsgReceivePulse(), применение которой полезно при обработке импульсов.

Единственно что забавляет при работе с импульсами — это то, что идентификатор отправителя, который возвращается функцией MsgReceive() при их приеме, имеет нулевое значение. Это верный индикатор того, что принятое сообщение является импульсом, а не стандартным сообщением клиента. В коде серверов вы будете часто видеть фрагменты, подобные представленному ниже:

#include 


rcvid = MsgReceive(chid, ...);

if (rcvid == 0) { // Это импульс

 // Определить тип импульса


 // Обработать его

} else { // Это обычное сообщение

 // Определить тип сообщения


 // Обработать его

}

Что внутри импульса?

Итак, вы принимаете сообщение с нулевым идентификатором отправителя. Что у него внутри? Вот фрагмент заголовочного файла

:

struct _pulse {

 _uint16    type;

 _uint16    subtype;

 _int8     code;

 _uint8    zero[3];

 union sigval value;

 _int32    scoid;

};

Элементы type и subtype равны нулю (это еще один признак того, что перед нами импульс); содержимое элементов code и value определяется отправителем. В общем случае элемент code будет указывать на причину, по которой был отправлен импульс, а параметр value будет содержать 32 бита данных, ассоциируемых с данным импульсом. Эти два поля и есть те самые 40 бит контента; другие поля пользователем не настраиваются.

Ядро резервирует отрицательные значения параметра code, оставляя 127 значений для программистов — для использования по своему усмотрению.

Элемент value в действительности является элементом типа

union
:

union sigval {

 int sival_int;

 void *sival_ptr;

};

Поэтому (в развитие примера с сервером, представленного выше) вы часто будете видеть программу, подобную этой:

#include 


rcvid = MsgReceive(chid, ...

if (rcvid == 0) { // Импульс

 // Определить тип импульса

 switch (msg.pulse.code) {

 case MY_PULSE_TIMER:

  // Сработал один из наших таймеров,

  // надо что-то делать...

  break;

 case MY_PULSE_HWINT:

  // Импульс получен от обработчика прерывания.

  // Надо заглянуть в поле «value»...

  val = msg.pulse.value.sival_int;

  // Сделать что-нибудь по этому поводу...

  break;

 case _PULSE_CODE_UNBLOCK:

  // Это импульс от ядра, разблокирующий клиента

  // Сделать что-нибудь по этому поводу...

  break;

  //и так далее...

 }

} else { // Обычное сообщение

 // Определить тип сообщения

 // Обработать его

}

В этой программе предполагается, конечно, что вы описали структуру msg так, чтобы она содержала элемент «

struct _pulse pulse;
», и что определены константы MY_PULSE_TIMER и MY_PULSE_HWINT. Код импульса _PULSE_CODE_UNBLOCK — один из тех самых отрицательных кодов, зарезервированных для ядра, как это было упомянуто выше. Вы можете найти полный список этих кодов (а также краткое описание поля value) в
.

Функция MsgReceivePulse()

Функции MsgReceive() и MsgReceivev() могут принимать либо стандартное сообщение, либо импульс. Однако, возможны ситуации, когда вы пожелаете принимать только импульсы. Лучшим примером этого является ситуация с сервером, когда вы приняли запрос от клиента на выполнение чего-нибудь, но не можете выполнить этот запрос сразу (возможно, из-за длительной операции, связанной с аппаратными средствами). В таких случаях следует, как правило, настроить аппаратные средства (или таймер, или что-нибудь еще) на передачу вам импульса всякий раз, когда происходит некое значительное событие.

Если вы напишете ваш сервер по стандартной схеме «ждать сообщения в бесконечном цикле», вы можете оказаться в ситуации, когда один клиент посылает вам запрос, а потом, пока вы ожидаете импульса, который должен сигнализировать об отработке запроса, приходит запрос от другого клиента. Вообще говоря, это как раз то что нужно — в конце концов, мы хотели иметь способность одновременно обслуживать множество клиентов. Однако, у вас могут быть веские основания отказать клиенту в обслуживании — например, если обслуживание клиента слишком ресурсоемко, и надо ограничить численность одновременно обрабатываемой клиентуры.

В таком случае вам потребуется обеспечить возможность «выборочного» приема только импульсов. Тут-то и становится актуальной функция MsgReceivePulse():

#include 


int MsgReceivePulse(int chid, void *rmsg, int rbytes,

 struct _msg_info *info);

Видно, что ее параметры те же, что и у функции MsgReceive() — идентификатор канала, буфер (и его размер), и параметр info — мы обсуждали его в параграфе «Кто послал сообщение?» Заметьте, что параметр info не применяется в импульсах. Вы можете спросить, почему он представлен в списке параметров. Ответ незамысловат: так было проще сделать. Просто передайте NULL!

Функция MgsReceivePulse() способна принимать только импульсы. Так, если бы у вас был канал с множеством потоков, блокированных на нем с помощью функции MsgReceivePulse() (и ни одного потока, блокированного на нем с помощью функции MsgReceive()), и некий клиент попытался бы отправить вашему серверу сообщение, то этот клиент остался бы заблокированным по передаче (Send-blocked) до тех пор, пока какой-либо поток сервера не вызовет MsgReceive(). Тем временем функция MsgReceivePulse() будет спокойно принимать импульсы.

Единственное, что можно гарантировать при совместном применении функций MsgReceivePulse() и MsgReceive(), — что функция MsgReceivePulse() обеспечит прием исключительно импульсов. Функция MsgReceive() сможет принимать как импульсы, так и обычные сообщения! Это происходит потому, что применение функции MsgReceivePulse() зарезервировано специально для случаев, где нужно исключить получение сервером обычных сообщений.

Это немного вводит в замешательство. Так как функция MsgReceive() может принимать и обычные сообщения, и импульсы, а функция MsgReceivePulse() может принимать только импульсы, то как быть с сервером, в котором применяются обе функции? Общий ответ такой. У вас есть пул потоков, выполняющих MsgReceive(). Этот пул потоков (один или более потоков — это зависит от числа клиентов, которое вы хотели бы обслуживать одновременно) отвечает за обработку запросов от клиентов.

Поскольку вы пытаетесь управлять численностью потоков- обработчиков, и некоторым из этих потоков может понадобиться блокироваться в ожидании импульса (например, от оборудования или от другого потока), вы блокируете поток-обработчик при помощи функции MsgReceivePulse(). Функция MsgReceivePulse() принимает только импульсы, а значит, ее применение гарантирует, что пока вы ждете импульса, к вам ненароком не просочится никакой клиентский запрос.

Функция MsgDeliverEvent()

Как было упомянуто выше в параграфе «Иерархический принцип обмена», существуют ситуации, когда приходится нарушать естественное направление передач.

Такой случай возможен, когда у вас есть клиент, который посылает серверу сообщение и при этом не хочет блокироваться, а результат может быть доступен только через некоторое время. Конечно, вы могли бы частично решить эту проблему путем применения многопоточных клиентов, выделяя в клиенте отдельный поток для блокирующих вызовов сервера, но это не всегда с успехом работает в больших системах, поскольку при большом количестве серверов количество ждущих потоков было бы слишком велико. Но допустим, вы не хотите здесь использовать многопоточность, а вместо этого вам нужно, чтобы сервер ответил клиенту сразу, и чем-то вроде «Заказ принят; я скоро вернусь». Здесь, поскольку сервер ответил, клиент теперь свободен продолжать свою работу. После того как сервер отработает запрос клиента, ему потребуется как-то сказать клиенту «Проснись, вот твой заказ.» Очевидно, как мы это уже видели при анализе иерархического принципа обмена, сервер не должен передавать сообщения клиенту, потому что если клиент в это же время отправит сообщение серверу, это может вызывать взаимную блокировку. Так как же сервер может послать сообщение клиенту без нарушения иерархического принципа?

В действительности это составная операция. Вот как это работает:

1. Клиент создает структуру типа

struct sigevent
и заполняет ее.

2. Клиент посылает сообщение серверу, в котором запрашивает: «Сделай для меня то-то, ответ дай сразу же, а по окончании работы уведоми меня об этом при помощи структуры

struct sigevent
— структуру прилагаю».

3. Сервер принимает сообщение (которое включает в себя структуру

struct sigevent
), сохраняет структуру
struct sigevent
и идентификатор отправителя и немедленно отвечает клиенту.

4. Теперь клиент выполняется — как и сервер.

5. Когда сервер завершает работу, он использует функцию MsgDeliverEvent(), чтобы сообщить об этом клиенту.

Мы рассмотрим более подробно структуру

struct sigevent
в главе «Часы, таймеры и периодические уведомления», в параграфе «Как заполнить структуру
struct sigevent
», а здесь мы только предположим, что структура
struct sigevent
— это «черный ящик», который содержит некоторое событие, используемое сервером для уведомления клиента.

Поскольку сервер хранит клиентские

struct sigevent
и идентификатор отправителя, он теперь сервер может вызвать функцию MsgDeliverEvent(), чтобы доставить событие клиенту, как клиент того и желал:

int MsgDeliverEvent(int rcvid, const struct sigevent *event);

Обратите внимание, что функция MsgDeliverEvent() принимает два параметра — идентификатор отправителя (rcvid) и доставляемое событие (event). Сервер никогда не изменяет и даже не читает событие! Этот момент важен, потому что это позволяет серверу доставлять события вне зависимости от их выбранного клиентом типа, без какой бы то ни было специальной обработки на стороне сервера.

Идентификатор rcvid — это идентификатор отправителя, который сервер получил от клиента. Заметьте, что это определенно особый случай. Обычно, после того как сервер ответил клиенту, идентификатор отправителя прекращает иметь значение (потому что клиент уже разблокирован, и сервер не может разблокировать его заново или считать/записать данные, и т.п.). Но в нашем случае, идентификатор отправителя содержит только информацию для ядра, какому клиенту должно быть доставлено событие. Вызывая MsgDeliverEvent(), сервер не блокируется — для сервера это неблокирующий вызов. Ядро доставляет событие клиенту, после чего тот выполняет какие бы то ни было соответствующие действия.

Флаги канала

Когда мы вначале книги изучали сервер (в параграфе «Сервер»), мы упомянули, что функция ChannelCreate() принимает параметр flags (флаги); правда, тогда мы вместо этого параметра передавали нуль.

Теперь пришло время более подробно изучить назначение параметра flags. Рассмотрим только некоторые из возможных его значений:

_NTO_CHF_FIXED_PRIORITY

Принимающий поток не изменит приоритет в зависимости от приоритета отправителя. (Мы поговорим о проблемах приоритетов более подробно в разделе «Наследование приоритетов»). Обычно (то есть если этот флаг не установлен) приоритет принимающего сообщение потока изменяется на приоритет потока- отправителя.

_NTO_CHF_UNBLOCK

Ядро посылает импульс всякий раз, когда поток клиента пытается разблокироваться. Чтобы клиент мог разблокироваться, сервер должен ему ответить. Мы обсудим это ниже, потому что это имеет некоторые интересные последствия — как для клиента, так и для сервера.

_NTO_CHF_THREAD_DEATH

Ядро посылает импульс всякий раз, когда блокированный на этом канале поток «умирает». Это полезно для серверов, которые желают поддержать фиксированную популяцию потоков в пуле, т. е. количество потоков, доступных для обслуживания запросов.

_NTO_CHF_DISCONNECT

Ядро посылает импульс всякий раз после того, как уничтожается последнее из имевшихся соединений сервера с некоторым клиентом.

_NTO_CHF_SENDER_LEN

Ядро доставляет серверу, наряду с остальной информацией, размер клиентского сообщения.

Флаг _NTO_CHF_UNBLOCK

Присмотримся к флагу _NTO_CHF_UNBLOCK. Этот флаг имеет несколько особенностей при его применении, интересных и для клиента, и для сервера.

Обычно (то есть когда сервер не устанавливает флаг _NTO_CHF_UNBLOCK), когда клиент хочет разблокироваться от MsgSend() (или MsgSendv(), MsgSendvs() или другой функции этого семейства), клиент просто берет и разблокируется. Клиент может пожелать разблокироваться по приему сигнала или по тайм-ауту ядра (см. функцию TimerTimeout() в Справочном руководстве по Си-библиотеке, а также главу «Часы, таймеры и периодические уведомления»). Неприятный аспект этого заключается в том, что сервер понятия не имеет, что клиент уже разблокирован и больше не ожидает ответа.

Давайте предположим, что у вас многопоточный сервер, и все потоки заблокированы с помощью функции MsgReceive(). Клиент посылает сообщение серверу, и один из потоков сервера принимает его. Клиент блокируется, поток же сервера активно обрабатывает запрос. Но прежде, чем поток сервера сможет ответить клиенту, клиент разблокируется из своего MsgSend() (предположим, что по причине приема сигнала).

Не забывайте: поток сервера по-прежнему обрабатывает поступивший от клиента запрос. Но так как клиент теперь разблокирован (например, его вызов MsgSend() возвратил EINTR), он теперь может послать серверу другой запрос. Вследствие особенности архитектуры серверов в QNX/Neutrino, очередное сообщение от этого клиента принял бы другой поток сервера, но идентификатор отправителя-то остается тем же самым! Сервер не сумеет различить эти два запроса, и когда первый поток сервера завершает обработку первого запроса и отвечает клиенту, фактически он отвечает на второе сообщение, а не на первое. Итак, первый поток сервера отвечает на второе сообщение клиента.

Это плохо уже само по себе; но давайте заглянем еще на шаг вперед. Теперь второй поток завершает свою работу по обработке запроса и пробует ответить клиенту. Но поскольку первый поток сервера уже ответил этому клиенту, а значит, этот клиент уже разблокирован, то попытка второго потока сервера ответить клиенту возвратится с ошибкой.

Эта проблема встречается только в многопоточных серверах, потому что в однопоточном сервере его единственный поток был бы по-прежнему занят обработкой первого запроса клиента. Это означает, что даже если бы клиент разблокировался и снова послал сообщение серверу, он перешел бы в SEND- блокированное состояние (а не в REPLY-блокированное состояние), позволив тем самым серверу закончить обработку первого запроса, ответить клиенту (что привело бы к ошибке, потому что клиент более не находится в REPLY-блокированном состоянии) и лишь затем принять второе сообщение. Здесь реальная проблема состоит в том, что сервер выполняет лишнюю операцию — обработку первого запроса. Операция же эта является абсолютно бесполезной, поскольку клиент больше не ожидает ее результатов.

Решение данной проблемы (в случае многопоточного сервера) заключается в том, что сервер должен при создании канала указать вызову ChannelCreate() флаг _NTO_CHF_UNBLOCK. Этот флаг скажет ядру: «Сообщи мне импульсом, когда клиент попробует разблокироваться, но не позволяй ему это делать! Я разблокирую клиента сам».

Ключевым моментом здесь является то, что этот флаг сервера изменяет поведение клиентов, не позволяя им разблокироваться до тех пор, пока им это не разрешит сервер.

В однопоточном сервере происходит следующее:

Действие Состояние клиента Состояние сервера
Клиент посылает запрос серверу Блокирован Обработка
Клиент получает сигнал Блокирован Обработка
Ядро передает импульс серверу Блокирован Обработка (первого сообщения)
Сервер завершает обработку первого запроса и отвечает клиенту Разблокирован, получены корректные данные Обработка (импульса)

Это не помогло клиенту разблокироваться, когда он должен был это сделать, но зато обеспечило, чтобы сервер не запутался. В подобном примере сервер мог вообще проигнорировать импульс, отправленный ему ядром. Это нормально — поскольку сделано предположение, что позволить клиенту быть заблокированным до тех пор, пока сервер не подготовит данные для него, безопасно.

Если вы хотите, чтобы сервер среагировал каким-то действием на посланный ядром импульс, то существует два способа реализации этого:

• Создать еще один поток в сервере, который «слушал» бы канал на предмет импульсов от ядра. Этот второй поток будет отвечать за отмену операции, выполняемой первым потоком. Отвечать клиенту может любой из этих двух потоков.

• Не выполнять задание клиента в потоке непосредственно, а поставить его в очередь заданий. Это обычно делается в приложениях, где сервер целенаправленно направляет задания клиента в очередь, и сервер является управляемым по событиям. Обычно одно из получаемых сервером сообщений указывает на то, что работа клиента завершена, и что пора отвечать. Когда в этом случае приходит импульс от ядра, сервер отменяет выполняемую для данного клиента работу и отвечает.

Какой из методов вам выбирать — это будет зависеть от типа работы, которую выполняет сервер. В первом случае сервер активно выполняет работу для клиента, так что у вас просто не будет иного выбора, чем применить второй поток, который слушал бы импульсы от ядра, сообщающие о разблокировании (далее — «импульсы разблокирования» — прим. ред.). Конечно, вы могли бы также организовать программный опрос в пределах потока для проверки, не пришел ли импульс, но программный опрос обычно удручает.

Во втором случае работу делает не сам сервер, а кто-то другой — возможно, оборудование, которому приказано «сходи и набери данных». При таком варианте поток сервера будет в любом случае блокирован по функции MsgReceive(), ожидая от оборудования признака завершения операции.

В обоих случаях сервер обязан ответить клиенту, иначе клиент останется заблокированным.

Проблема синхронизации

Но даже если вы используете флаг _NTO_CHF_UNBLOCK, как это описано выше, остается еще одна проблема синхронизации. Предположим, что несколько потоков вашего сервера заблокированы по функции MsgReceive() в ожиданий сообщения или импульса, и клиент посылает серверу сообщение. Один из потоков разблокируется и начнет обрабатывать запрос клиента. В процессе этого клиент вдруг пожелает разблокироваться, и ядро сгенерирует предупреждающий об этом импульс (импульс разблокирования). Другой поток сервера примет этот импульс. Фактически здесь мы имеем гонки потоков — первый поток на момент получения вторым импульса мог быть уже почти готов ответить клиенту. Если ответит второй поток (тот, который получил импульс), то есть шанс, что клиент разблокируется и передаст серверу еще одно сообщение. При этом первый поток сервера получает шанс завершить работу по первому запросу и ответить полученными данными на второй запрос:

Путаница в многопоточном сервере.

Также возможен такой вариант: поток, получивший импульс, готовится ответить клиенту, а в это время отвечает первый поток. Получается то же самое, что и раньше — первый поток разблокирует клиента, клиент передает второй запрос, второй поток (тот, который получил импульс) разблокирует клиента по второму запросу.

Здесь мы имеем ситуацию с двумя параллельными потоками обработки (один вызван сообщением клиента и один — импульсом). Обычно в таких ситуациях применяются мутексы.

К сожалению, это привело бы к проблеме — мутекс нужно было бы захватить немедленно после вызова MsgReceive() и освободить перед вызовом MsgReply(). Это, конечно, будет работать, но это войдет в противоречие с самим предназначением импульса разблокирования! (Сервер мог бы либо получить сообщение и игнорировать импульс разблокирования, пока не ответит клиенту, либо получить импульс разблокирования и отменить второй запрос клиента.)

Многообещающим решением (но в конечном счете все равно обреченным на провал) выглядит применение «мелкозернистого» мутекса, то есть мутекса, который захватывается и освобождается только в небольших областях потока управления (как, собственно, и предполагается использовать мутекс — вместо блокирования целого раздела, как это было предложено выше). Можно было бы организовать в сервере флаг «Мы уже ответили?», сбрасывая его при приеме сообщения и снова устанавливая после ответа на него. Непосредственно перед ответом на сообщение проверяется состояние этого флага. Если флаг указывает на то, что ответ уже произведен, то отвечать не надо. Мутекс при этом следовало бы захватывать и освобождать только в областях проверки и установки/сброса флага.

К сожалению, это не будет работать, потому что мы далеко не всегда имеем дело с двумя потоками управления — не всегда же клиент будет получать сигнал в процессе обработки запроса, порождая тем самым импульс разблокирования. Ниже приведен сценарий, где предложенная схема не сработает:

• Клиент передает сообщение серверу; после этого клиент блокируется, а сервер переключается в режим обработки.

• Поскольку сервер принял запрос от клиента, флаг ответа сбрасывается в 0, указывая этим, что мы все еще должны ответить.

• Сервер отвечает клиенту в нормальном режиме (потому что флаг был установлен в 0) и устанавливает флаг в 1, указывая этим, что если прибудет импульс разблокирования, то его следует игнорировать.

• (Проблемы начинаются здесь.) Клиент посылает серверу второе сообщение и почти немедленно после этого получает сигнал; ядро передает серверу импульс разблокирования.

• Поток сервера, который принял сообщение, намеревался захватить мутекс для проверки состояния флага, но еще не успел это сделать, поскольку был вытеснен.

• Другой поток сервера получает импульс разблокирования, и поскольку флаг по-прежнему находится в состоянии 1 с момента последней установки, игнорирует этот импульс.

• Первый поток сервера захватывает мутекс и сбрасывает флаг.

• К этому моменту событие разблокирования клиента потеряно.

Даже если вы усовершенствуете флаг, задав для него большее количество состояний (таких как «импульс получен», «дан ответ на импульс», «сообщение получено», «дан ответ на сообщение»), у вас по-прежнему останется проблема гонок при синхронизации, потому что не существует атомарной операции, связывавшей бы флаг ответа и функции приема сообщения и ответа на него. (Именно это и определяет суть проблемы — небольшие окна во времени, одно после MsgReceive(), но до сброса флага, и второе — после того, как флаг установлен, но до вызова MsgReply().) Единственный способ обойти проблему состоит в том, чтобы переложить работу по отслеживанию состояния флага на ядро.

Применение флага _NTO_MI_UNBLOCK_REQ

К счастью, ядро отслеживает для нас состояние этого флага — под это отведен один бит в информационной структуре сообщения (это структура типа

struct _msg_infо
, которая передается функции MsgReceive() в качестве последнего параметра, и которую можно также впоследствии получить по идентификатору отправителя, вызвав функцию MsgInfo()).

Этот флаг называется _NTO_MI_UNBLOCK_REQ и устанавливается в 1, если клиент желает разблокироваться (например, получив сигнал).

Это означает, что в многопоточном сервере у вас будет поток- обработчик, выполняющий работу для клиента, и еще один поток, который будет получать сообщения разблокирования (или другие; но сконцентрируемся пока только на сообщениях разблокирования). Когда от клиента приходит сообщение разблокирования, установите для себя флаг, чтобы дать вашей программе знать о желании потока разблокироваться.

Существуют две ситуации, которые необходимо рассмотреть:

• поток-обработчик заблокирован;

• поток-обработчик активен (выполняется).

Если поток-обработчик блокирован (например, в ожидании ресурса), то поток, получивший сообщение разблокирования, должен его разбудить. Когда поток-обработчик активизируется, он должен проверить состояние флага _NTO_MI_UNBLOCK_REQ и, если флаг установлен, дать ответ о ненормальном завершении. Если флаг сброшен, то поток может спокойно выполнять все, что ему необходимо для нормальной обработки запроса.

В противном случае, если поток-обработчик активен, он должен периодически проверять «флаг, выставляемый в его отношении» потоком, принимающим сообщение разблокирования, и если флаг установлен в 1, он должен ответить клиенту с кодом ошибки. Заметьте, что это всего-навсего оптимизация: в неоптимизированном случае поток-обработчик постоянно вызывал бы функцию MsgInfo() по идентификатору отправителя и проверял бит _NTO_MI_UNBLOCK_REQ самостоятельно.

Обмен сообщениями в сети

Прозрачный обмен сообщениями в сети не поддерживается в версии QNX/Neutrino 2.00, но это намечено к реализации в более поздних версиях данной ОС. (Поддержка этого механизма реализована в QNX/Neutrino, начиная с версии 2.11, и присутствует в QNX Realtime Platform, начиная с релиза 6.1.0 — прим. ред.) Я привожу рассмотрение этого вопроса в данной книге по двум причинам: 1) Когда этот механизм будет реализован, им можно будет воспользоваться. 2) Это настолько изящно, что грех не рассказать об этом!

Чтобы не вносить излишней путаницы, до сих пор я избегал вопроса о применении обмена сообщениями в сети, хотя реально это основополагающий фактор гибкости QNX/Neutrino!

Все, что вы узнали из книги до этого момента, применимо и к передаче сообщений по сети.

Ранее в этой главе я демонстрировал пример:

#include 

#include 


int main (void) {

 int fd;

 fd = open("/net/wintermute/home/rk/filename", O_WRONLY);

 write(fd, "Это обмен сообщениями\n", 24);

 close(fd);

 return(EXIT_SUCCESS);

}

В то время, я говорил, что это был пример «обмена сообщениями в сети». Клиент соединяется с сервером, определяемым тройкой ND/PID/CHID (и который оказывается на другом узле), а сервер выполняет на своем канале MsgReceive(). Клиент и сервер в данном случае абсолютно аналогичны клиенту и серверу в варианте с локальным узлом. Собственно, прекратить читать книгу можно прямо здесь — в передаче сообщений по сети нет ничего хитрого. На вам, наверное, любопытно как все это происходит? Читайте дальше!

Теперь, когда мы уже рассмотрели в подробностях особенности локального обмена сообщениями, мы можем более углубленно обсудить, как осуществляется передача сообщений в сети. И хотя это обсуждение может показаться сложным, на самом деле все сводится к двум этапам: этапу разрешения имен и этапу собственно передачи сообщений.

Вот рисунок, иллюстрирующий эти этапы:

Обмен сообщениями в сети. Отметьте, что модуль qnet разделен на две части.

На данном рисунке наш узел называется

magenta
, а целевой узел по аналогии с примером называется
wintermute
.

Рассмотрим взаимодействия, которые происходят, когда программа-клиент использует

qnet
, чтобы обратиться к серверу через сеть:

1. Функции open() клиента было предписано открыть файл с именем, которое начинается с

/net
. (Имя
/net
— имя по умолчанию, объявляемое администратором
qnet
— см. документацию по QNX/Neutrino, раздел
npi-qnet
). Клиент понятия не имеет, кто именно отвечает за конкретное имя пути, поэтому он соединяется с администратором процессов (шаг 1), чтобы выяснить, кому принадлежит ресурс. Это выполняется автоматически и не зависит от того, передаем ли мы сообщения по сети или нет. Поскольку все ресурсы, имена которых начинаются с
/net
, принадлежат администратору
qnet
, администратор процессов отвечает клиенту, что относительно этого имени пути надо спросить администратор
qnet
.

2. Клиент теперь посылает сообщение потоку администратора

qnet
, надеясь, что тот будет способен обработать запрос. Однако администратор
qnet
на этом узле не может предоставить клиенту конечный сервис, поэтому он сообщает клиенту, что тот должен обратиться к администратору процессов на узле
wintermute
. (Это делается специальным перенаправляющим сообщением, в котором содержатся ND/PID/CHID сервера, к которому надо обратиться взамен.) Это перенаправление также автоматически обрабатывается клиентской библиотекой.

3. Клиент соединяется с администратором процессов на узле

wintermute
. Это включает в себя отправку сообщения другому узлу с помощью драйверного потока
qnet
. Процесс
qnet
клиентского узла получает сообщение и транспортирует его через сетевую среду удаленному
qnet
, который, в свою очередь, доставляет его администратору процессов на узле
wintermute
. Администратор процессов этого узла разрешает остальную часть имени пути (в нашем примере это
/home/rk/filename
) и отвечает перенаправляющим сообщением. Это сообщение передается обратно — через
qnet
сервера по сетевой среде к
qnet
клиента и, наконец, самому клиенту. Поскольку в этом сообщении содержатся ND/PID/CHID нужного сервера, предоставляющего конечный сервис, клиент теперь знает, к кому обращаться (в нашем примере это администратор удаленной файловой системы).

4. Теперь клиент посылает запрос непосредственно нужному серверу. Маршрут следования сообщения здесь идентичен описанному в предыдущем пункте, за исключением того, что на этот раз связь с сервером осуществляется напрямую, а не через администратор процессов.

После того как пройдены этапы 1 и 3, все дальнейшие коммуникации осуществляются аналогично этапу 4. В вышеприведенном примере все сообщения типа open(), read() и close() идут по маршруту, описанном в этапе 4. Заметьте, что вся последовательность рассмотренных событий была запущена вызовом open(), но само сообщение open() все равно дошло до сервера-адресата так, как это описано этапом 4.

Для особо любопытных: на самом деле я пропустил в изложении один этап. На этапе 2, когда клиент спрашивает

qnet
об узле
wintermute
,
qnet
должен сначала выяснить, кто такой wintermute. Это может привести к еще одной сетевой транзакции для разрешения имени узла. Приведенный выше рисунок корректен, если предположить, что
qnet
заранее знал про узел с именем
wintermute
.

Мы еще вернемся к сообщениям, используемым функциями open(), read() и close() (а также другими функциями) в главе «Администраторы ресурсов».

Особенности обмена сообщениями в сети

Итак, как только соединение установлено, все дальнейшие операции обмена сообщениями осуществляются в соответствии с этапом 4, как указано на рисунке. Это может привести вас к ошибочному представлению, что передача сообщений по сети идентична локальной. К сожалению, это не так. Вот список отличий:

• более длительные задержки;

• функция ConnectAttach() возвращает признак успешного соединения независимо от того, является ли узел доступным или нет — реальный признак ошибки проявляется только при первой попытке передать сообщение;

• функция MsgDeliverEvent() не обеспечивает достаточной надежности;

• функции MsgReply(), MsgRead(), MsgWrite() являются блокирующими вызовами (в локальном варианте они не являлись таковыми);

• функция MsgReceive() не будет принимать все данные, посланные клиентом; сервер будет должен вызывать функцию MsgRead() для получения окончательных остальных данных.

Более длительные времена задержки

Поскольку передача сообщений теперь выполняется в некоторой среде, а не прямым копированием «память-память» под управлением ядра, можно смело ожидать, что затраты времени на передачу сообщения будут существенно выше (100-Мбитный Ethernet в сравнении с 128-битным динамическим ОЗУ с тактированием 100 МГц будет ориентировочно на один или два порядка медленнее). В дополнение к этому также будут сказываться накладные расходы протокола и повторные попытки передачи в сбойных сетях.

Воздействие на функцию ConnectAttach()

Когда вы вызываете функцию ConnectAttach(), вы задаете ей идентификаторы ND, PID и CHID. Все, что при этом происходит в QNX/Neutrino, заключается в возврате ядром идентификатора соединения драйверному потоку

qnet
, изображенному выше на рисунке. Поскольку никакого сообщения еще не отправлено, вы не имеете информации о том, доступен ли узел, к которому вы только что подсоединились, или нет. В обычном случае это не проблема, потому что большинство клиентов не будет самостоятельно вызывать ConnectAttach() и скорее воспользуется библиотечной функцией open(), которая перед передачей сообщения «open» сама вызывает ConnectAttach(). Это практически немедленно дает информацию о доступности удаленного узла.

Воздействие на функцию MsgDeliverEvent()

Когда сервер вызывает функцию MsgDeliverEvent() локально, ответственность за доставку события целевому потоку ложится на ядро. В сетевом варианте сервер также может вызывать функцию MsgDeliverEvent(), но на этот раз ядро доставит «заготовку» этого события администратору

qnet
, возлагая на него ответственность за доставку этой «заготовки» удаленному
qnet
, который уже доставит реальное событие клиенту. Так вот, на стороне сервера с этим вызовом могут возникнуть проблемы, потому что он не является блокирующим. Это означает, после вызова MsgDeliverEvent() сервер продолжает выполняться, и поздно уже оглядываться и говорить «Знаете, очень не хочется вас огорчать, но помните тот вызов MsgDeliverEvent()? Так вот, он не сработал...»

Воздействие на функции MsgReply(), MsgRead() и MsgWrite()

Чтобы уберечь функции MsgReply(), MsgRead() и MsgWrite() от вышеупомянутой проблемы MsgDeliverEvent(), эти функции при использовании их в сети преобразуются в блокирующие вызовы. В локальном случае они бы просто передали данные и разблокировались; в сети же мы должны либо удостовериться, что данные были доставлены клиенту (в случае MsgReply()), либо собственно передать данные по сети клиенту или от него (в случае двух других функций).

Воздействие на функцию MsgReceive()

Функция MsgReceive() (при использовании в сети) тоже оказывается под влиянием. На момент разблокирования функции MsgReceive() на стороне сервера

qnet
может еще не успеть передать все данные клиента. Это делается из соображений производительности.

В структуре

struct _msg_infо
, передаваемой функции MsgReceive() в качестве последнего параметра (мы подробно рассматривали эту структуру в параграфе «Кто послал сообщение?»), есть два флага:

msglen Указывает на фактическое количество данных, переданное функцией MsgReceive() (
qnet
любит передавать по 8Кб за один раз).
srcmsglen Указывает на количество данных, которое клиент хотел передать (определяется клиентом).

Таким образом, если бы клиент желал передать 1 мегабайт данных по сети, MsgReceive() сервера разблокировалась бы, установив параметр msglen в значение 8192 (указывая, что 8192 байта доступны в буфере); параметр srcmsglen при этом равнялся бы 1048576 (указывая, что клиент пытался переслать 1 мегабайт данных).

Затем сервер использует MsgRead() для получения остальной части данных из адресного пространства клиента.

Несколько замечаний о дескрипторах узлов

Еще одна любопытная вещь, которой мы еще не касались в обсуждениях обмена сообщениями, — это дескрипторы узлов, для краткости обозначаемые «ND» (сокр. от Node Descriptor — прим. ред.).

Вспомните: в наших примерах мы использовали символьные имена узлов, например,

/net/wintermute
. В QNX4 (предыдущая версия QNX до появления QNX/Neutrino) вся работа в сети была основана на концепции идентификатора узла, небольшого целого числа, уникально определяющего узел сети. Таким образом, в терминах QNX4 мы говорили бы что-то вроде «узел 61», или «узел 1», и это отражалось бы и на вызовах функций тоже.

При работе в QNX/Neutrino все узлы внутренне представляются 32-разрядными числами, но эти числа не являются уникальными в сети! Я имею в виду, что узел

wintermute
может думать об узле
spud
как об узле с дескриптором 7, в то время как сам узел
spud
может думать, что дескриптор 7 соответствует узлу
magenta
. Поясню подробнее, чтобы дать полную картину происходящего. В приведенной ниже таблице сведены примерные дескрипторы узлов, которые могли бы использоваться для описания трех узлов:
wintermute
,
spud
и
foobar
(не путать с аббревиатурой FUBAR — прим. ред. :-):

Узел
wintermute
spud
foobar
wintermute 0 7 4
spud 4 0 6
foobar 5 7 0

Обратите внимание, что каждый узел считает свой собственный дескриптор нулевым. Также отметьте, что для узла

spud
оба узла
wintermute
и
foobar
имеют дескриптор 7. Однако, для узла
foobar
узел
wintermute
имеет дескриптор 4, а узел
spud
— 6. Как я и упоминал раньше, эти номера не уникальны в сети, но они уникальны на каждом узле. Вы можете относиться к ним, как к файловым дескрипторам — два процесса, когда обращаются к одному и тому же файлу, могут иметь для него как одинаковый дескриптор, так и нет — все зависит от того, кто, когда и который файл открывает.

К счастью, вам не надо беспокоиться о дескрипторах узлов по ряду причин:

• Большинство осуществляемых вами операций обмена сообщениями «с внешним миром» будут реализовываться с помощью вызовов функций высокого уровня (таких как функция open(), приведенная в примере выше).

• Дескрипторы узлов не кэшируются — предполагается, что получив дескриптор, вы используете немедленно и забудете про него.

• Существует ряд библиотечных функций, предназначенных для преобразования имени пути (например,

/net/magenta
) в дескриптор узла.

Чтобы работать с дескрипторами узлов, вам понадобится подключить файл

, потому что он содержит прототипы семейства функций netmgr_*().

Для преобразования строки в дескриптор узла используется функция netmgr_strtond(). После получения дескриптора узла его следует сразу же применить в вызове функции ConnectAttach(). Не пытайтесь сохранять его какой-либо структуре данных! Веским основанием для этого является то, что администратор сети может решить повторно использовать дескриптор после отключения всех соединений с узлом.

Так что если вы получили дескриптор «7» для узла

/net/magenta
, подсоединились к нему, передали сообщение и затем отсоединились, то существует возможность того, что администратор сети заново назначит дескриптор «7» другому узлу.

Поскольку дескрипторы узлов в сети не уникальны, возникает вопрос: «А как передавать эти штуки по сети?» Очевидно, взгляды узла

magenta
и узла
wintermute
на дескриптор «7» будут радикально отличаться. Существуют два способа решения этой проблемы:

• Не передавать по сети дескрипторы узлов и пользоваться символьными именами (например,

/net/wintermute
).

• Применять функцию netmgr_remote_nd().

Первый метод хорош как универсальное решение. Второй метод достаточно удобен на практике.

int netmgr_remote_nd(int remote_nd, int local_nd);

Эта функция принимает два параметра, где remote_nd — дескриптор узла целевой машины, a local_nd — дескриптор узла, который нужно преобразовать из точки зрения локальной машины в точку зрения целевой. Результатом является дескриптор узла, корректный с точки зрения заданной удаленной машины.

Например, пусть

wintermute
— имя нашей локальной машины. У нас есть дескриптор узла «7», который является корректным на нашей локальной машине и указывает на узел
magenta
. Мы хотели бы выяснить, какой дескриптор узла использует узел
magenta
для связи с нашим узлом:

int remote_nd;

int magenta_nd;

magenta_nd = netmgr_strtond("/net/magenta", NULL);

printf("ND узла magenta — %d\n", magenta_nd);

remote_nd = netmgr_remote_nd(magenta_nd, ND_LOCAL_NODE);

printf("С точки зрения узла magenta, наш ND — %d\n",

 remote_nd);

Это программа могла бы вывести что-то вроде следующего:

ND узла magenta - 7

С точки зрения узла magenta, наш ND — 4

Это говорит о том, что на узле

magenta
нашему узлу соответствует дескриптор «4». (Обратите внимание на использование специальной константы ND_LOCAL_NODE, которая в действительности равна нулю, для указания на «локальный узел»).

Теперь вернемся к тому, о чем мы говорили в разделе «Кто послал сообщение?»). Параметр

struct _msg_info
содержит, среди всего прочего, два дескриптора узлов:

struct _msg_info {

 int nd;

 int srcnd;

 ...

};

Мы определили в описании для этих двух полей, что:

nd — дескриптор принимающего узла с точки зрения передающего;

srcnd — дескриптор передающего узла с точки зрения принимающего.

Так, для приведенного выше примера, где узел

wintermute
— локальный, а узел
magenta
— удаленный, когда узел
magenta
посылает нам (узлу
wintermute
) сообщение, эти поля заполняются следующим образом:

nd равен 7;

srcnd равен 4.

Наследование приоритетов

Одним из интересных моментов в операционных системах реального времени является феномен инверсии приоритетов.

Инверсия приоритетов наблюдается, например, в случае, когда поток с низким приоритетом потребляет все процессорное время, в то время как в состоянии готовности находится поток с более высоким приоритетом.

Вы, наверное, сейчас думаете: «Минуточку! Вы утверждали ранее, что поток с более высоким приоритетом будет всегда вытеснять поток с более низким приоритетом! Как же такое может быть?»

Вы абсолютно правы. Поток с более высоким приоритетом всегда будет вытеснять поток с более низким приоритетом. Но при этом все-таки может произойти кое-что интересное. Давайте рассмотрим сценарий с тремя потоками (в трех различных процессах, для простоты рассмотрения), где «L» — поток с низким приоритетом, «Н» — поток с высоким приоритетом, и «S» — сервер. На рисунке показаны все три потока со своими приоритетами.


Три потока с различными приоритетами.

В данный момент выполняется поток Н. Потоку сервера S, имеющему наивысший приоритет, пока делать нечего, так что он находится в режиме ожидания и блокирован на функции MsgReceive(). Поток L и хотел бы работать, но его приоритет ниже, чем у потока Н, который выполняется в данный момент. Все как вы и предполагали, да?

А теперь представьте себе, что поток Н принял решение «прикорнуть» на 100 миллисекунд — возможно, чтобы подождать медленное оборудование. Теперь выполняется поток L.

Вот тут-то все интересное и начинается.

В пределах своего нормального функционирования поток L посылает сообщение потоку сервера S, принуждая этим сервер S перейти в состояние READY и (поскольку поток S имеет высший приоритет из всех готовых к выполнению потоков) начать выполняться. К великому сожалению, сообщение, которое поток L направил к потоку сервера S, было сформулировано так: «Вычислить значение Пи с точностью до 50 знаков после запятой».

Очевидно, это займет более чем 100 миллисекунд. Поэтому, когда 100 миллисекунд сна потока Н истекут, поток Н перейдет в состояние READY — угадайте, что дальше? Поток Н не активизируется, постольку в состоянии READY находится поток S, имеющий более высокий приоритет!

Что здесь произошло? Произошло то, что поток с низким приоритетом «отстранил» от работы поток с более высоким приоритетом путем передачи процессора потоку с еще более высоким приоритетом. Это явление называется инверсией приоритетов.

Чтобы научиться не допускать таких вещей, мы должны поговорить о наследовании приоритетов. Простой вариант реализации наследования приоритета — заставить сервер S унаследовать приоритет клиентского потока:

Блокированные потоки.

При таком сценарии по истечении 100-миллисекундного интервала бездействия потока Н этот поток переходит в состояние READY и немедленно ставится на выполнение как имеющий наивысший приоритет.

Неплохо; однако, здесь есть еще один тонкий момент.

Предположим, что потоку Н вдруг становится нужно выполнить какие-то вычисления — например, найти 5034-е по порядку простое число. Он посылает сообщение потоку сервера S и блокируется.

Однако, в данный момент S по-прежнему вычисляет значение Пи, находясь на приоритете 5! В нашей выбранной для примера системе наверняка достаточно других потоков, имеющих приоритет выше, чем 5, которым тоже нужен процессор. Это автоматически значит, что процессорного времени на вычисление значения Пи у S остается не так уж и много.

Это еще одна форма инверсии приоритетов. В этом случае поток с низким приоритетом помешал потоку с более высоким приоритетом получить доступ к ресурсу. Сравните это с первой формой инверсии приоритета, где поток с низким приоритетом реально потреблял ресурсы процессора — в рассматриваемом сейчас случае этот поток не дает более приоритетному потоку доступа к ресурсам процессора, но сам при этом непосредственно их не потребляет.

К счастью, данная проблема решается тоже достаточно просто. Достаточно увеличить приоритет сервера так, чтобы он был равен наивысшему из приоритетов всех заблокированных клиентов:

Повышение приоритета сервера.

Здесь мы немного «обделяем» другие потоки, позволяя заданию потока L выполняться с приоритетом выше, чем он сам, но зато гарантируем, что поток Н получит свою заслуженную порцию процессорного времени.

Так в чем тут хитрость?

Никакой хитрости нет, QNX/Neutrino делает все для вас автоматически.

В QNX/Neutrino этот механизм реализован только на один уровень вглубь, так что если клиент посылает серверу сообщение, а этот сервер, в свою очередь, передает это сообщение другому серверу, то второй сервер наследует нормальный приоритет первого, а не приоритет, который первый унаследовал от клиента. Это означает, что если на первом сервере блокируется поток с более высоким приоритетом, то соответственно повышен будет приоритет только первого сервера (а поскольку первый сервер в этот момент блокирован на втором, приоритет которого уже не повышается и остается прежним, толку от этого не будет абсолютно никакого). Будьте внимательны!

Однако, и здесь есть еще одна тонкость. Как обеспечить возврат приоритета на тот уровень, который был до изменения?

Ваш сервер работает, обслуживает запросы клиентуры и автоматически регулирует свой приоритет каждый раз, когда ему приходится разблокироваться из функции MsgReceive(). Но когда он должен восстанавливать прежнее значение приоритета, которое было до вызова MsgReceive()?

Рассмотрим два варианта развития событий.

• После обслуживания клиента сервер выполняет еще какие-то дополнительные действия. Это он должен сделать на своем приоритете, а не на приоритете клиента.

• После обслуживания клиента сервер немедленно вызывает MsgReceive() снова для обработки следующего запроса.

В первом случае для сервера было бы некорректно работать на приоритете клиента, поскольку он больше не делает для этого клиента никакой работы. Решение здесь очень простое. Используйте функцию pthread_setschedparam() (мы ее обсуждали в главе «Процессы и потоки») для возврата приоритету нужного значения.

Что касательно второго случая, то ответ достаточно прост. Кому какое дело?

Подумайте об этом. Какая разница, станет сервер RECEIVE-блокированным на приоритете 29 или на приоритете 2?

Главное — что он RECEIVE-блокирован! А коль скоро в этом состоянии он не расходует процессорное время, его приоритет является несущественным. Как только функция MsgReceive() разблокирует сервер, сервером будет унаследован приоритет нового клиента, и все будет работать как полагается.

Резюме

Обмен сообщениями представляет собой чрезвычайно мощную концепцию и является одним из основополагающих принципов, на которых построена QNX/Neutrino (как и все предыдущие версии QNX).

С помощью механизма обмена сообщениями клиент и сервер обмениваются информацией (между потоками в пределах одного процесса, между потоками в различных процессах на том же самом узле или между потоками в различных процессах на различных узлах сети). Клиент посылает сообщение и блокируется до тех пор, пока сервер не примет сообщение, не обработает его и не ответит на него.

Основные преимущества передачи сообщений:

• Содержание сообщения не зависит от местоположения адресата (локально или удаленно в сети).

• Сообщения обеспечивают четкую границу развязки клиентов и серверов.

• Неявные автоматические механизмы синхронизации и соблюдения очередности сообщений упрощают проектирование ваших приложений.

Загрузка...