Ожидание завершения порожденных потоков выполнения
Помимо устранения конфликтов при выводе данных, блокировки модуля потоков имеют и другие очень полезные применения. Они могут использоваться в качестве основы парадигм синхронизации более высокого уровня (например, семафоров) и использоваться как универсальные инструменты взаимодействий между потоками.17 18 19 20 21 22 В частности, в примере 5.8 глобальный список блокировок позволяет установить окончание работы всех дочерних потоков.
Пример 5.8. PP4E\System\Threads\thread-count-wait1.py
использование мьютексов в родительском/главном потоке выполнения для определения момента завершения дочерних потоков, взамен time.sleep; блокирует stdout, чтобы избежать конфликтов при выводе;
import _thread as thread
stdoutmutex = thread.allocate_lock()
exitmutexes = [thread.allocate_lock() for i in range(10)]
def counter(myId, count): for i in range(count): stdoutmutex.acquire() print('[%s] => %s’ % (myId, i)) stdoutmutex.release()
exitmutexes[myId].acquire() # сигнал главному потоку
for i in range(10):
thread.start_new_thread(counter, (i, 100))
for mutex in exitmutexes:
while not mutex.locked(): pass print(‘Main thread exiting.’)
Для проверки состояния блокировки можно использовать ее метод locked. Главный поток создает по одной блокировке для каждого дочернего потока, помещая их в глобальный список exitmutexes (не забывайте, что функция потока использует глобальную область совместно с главным потоком). По завершении каждый поток приобретает свою блокировку в списке, а главный поток просто ждет, когда будут приобретены все блокировки. Это значительно более точный подход, чем просто приостанавливать работу на определенное время, пока выполняются дочерние потоки, в надежде обнаружить после возобновления, что все они будут завершены.
В зависимости от операций, выполняемых в потоках, все это можно организовать еще проще: поскольку потоки в любом случае совместно используют глобальную память, того же результата можно добиться с помощью простого глобального списка целых чисел, а не блокировок. В примере 5.9 пространство имен модуля (область видимости), как и прежде, совместно используется программным кодом верхнего уровня и функцией, выполняемой в потоке. Имя exitmutexes ссылается на один и тот же объект списка в главном потоке и во всех порождаемых потоках. По этой причине изменения, производимые в потоке, видны в главном потоке без использования лишних блокировок.
Пример 5.9. PP4E\System\Threads\thread-count-wait2.py
использование простых глобальных данных (не мьютексов) для определения момента завершения всех потоков в родительском/главном потоке; потоки совместно используют список, но не его элементы, при этом предполагается, что после создания список не будет перемещаться в памяти import _thread as thread stdoutmutex = thread.allocate_lock() exitmutexes = [False] * 10
def counter(myId, count): for i in range(count): stdoutmutex.acquire() print(‘[%s] => %s’ % (myId, i)) stdoutmutex.release()
exitmutexes[myId] = True # сигнал главному потоку
for i in range(10):
thread.start_new_thread(counter, (i, 100))
while False in exitmutexes: pass print(‘Main thread exiting.’)
Вывод этого сценария похож на вывод предыдущего - 10 потоков параллельно ведут счет до 100 и в процессе работы синхронизируют свои обращения к функции print. Фактически оба последних сценария с потоками-счетчиками производят вывод, в общем аналогичный первоначальному сценарию thread_count.py, но данные при выводе в stdout не повреждаются, значения счетчиков больше и отличается случайный порядок вывода строк. Основное отличие состоит в том, что главный поток завершает работу сразу после (и не раньше!) порожденных дочерних потоков:
C:\...\PP4E\System\Threads> python thread-count-wait2.py ...часть вывода удалена...
[4]
=>
98
[6]
=>
98
[8]
=>
98
[5]
=>
98
[0]
=>
99
[7]
=>
98
[9]
=>
98
[1]
=>
99
[3]
=>
99
[2]
=>
99
[4]
=>
99
[6]
=>
99
[8]
=>
99
[5]
=>
99
[7] => 99 [9] => 99
Main thread exiting.
Альтернативные приемы: циклы занятости, аргументы и менеджеры контекста
Обратите внимание, что главные потоки выполнения в двух последних сценариях в конце выполняют цикл ожидания, который может заметно снизить производительность в критически важных приложениях. В таких ситуациях достаточно просто добавить в цикл ожидания вызов функции time.sleep, чтобы оформить паузу между проверками и освободить процессор для других заданий: эта функция будет приостанавливать только вызывающий поток выполнения (в данном случае - главный поток). Можно также попробовать добавить вызов функции sleep в функцию, которая выполняется в потоках, чтобы сымитировать выполнение продолжительных операций.
Для единообразия вместо использования глобальной области видимости можно было бы также организовать передачу блокировки в виде аргумента функции, которая выполняется в потоках. В этом случае все потоки выполнения будут ссылаться на один и тот же объект блокировки, потому что все они являются частью одного и того же процесса. Память процесса, занятая объектом, является памятью, совместно используемой потоками независимо от того, как будет получена ссылка на этот объект (через глобальные переменные, через аргументы функции, через атрибуты объектов или каким-либо другим способом).
И еще - чтобы гарантировать освобождение блокировки при выходе потока выполнения из критического блока, можно использовать инструкцию with, как мы делали это в предыдущей главе, чтобы обеспечить закрытие файлов. Менеджер контекста блокировки приобретает блокировку при входе в инструкцию with и освобождает ее при выходе из тела инструкции, независимо от того, возникло исключение или нет. Этот прием позволяет сэкономить одну строку программного кода и дополнительно гарантирует освобождение блокировки в ситуациях, когда возможно появление исключения. Все эти приемы реализованы в примере 5.10, представляющем улучшенную версию нашего сценария с потоками-счетчиками.
Пример 5.10. PP4E\System\Threads\thread-count-wait3.py
объект мьютекса, совместно используемый всеми потоками выполнения, передается функции в виде аргумента; для автоматического приобретения/освобождения блокировки используется менеджер контекста; чтобы избежать излишней нагрузки в цикле ожидания, и для имитации выполнения продолжительных операций добавлен вызов функции sleep
import _thread as thread, time
stdoutmutex = thread.allocate_lock() numthreads = 5
exitmutexes = [thread.allocate_lock() for i in range(numthreads)]
def counter(myId, count, mutex): # мьютекс передается в аргументе for i in range(count):
time.sleep(1 / (myId+1)) # различные доли секунды
with mutex: # приобретает/освобождает блокировку: with
print(‘[%s] => %s’ % (myId, i))
exitmutexes[myId].acquire() # глобальный список: сигнал главному потоку
for i in range(numthreads):
thread.start_new_thread(counter, (i, 5, stdoutmutex))
while not all(mutex.locked() for mutex in exitmutexes): time.sleep(0.25) print(‘Main thread exiting.’)
Различные времена ожидания для разных потоков выполнения делают их более независимыми:
C:\...\PP4E\System\Threads> thread-count-wait3.py
[4] => 0
[3] => 0
[2] => 0
[4] => 1
[1] => 0
[3] => 1
[4] => 2
[2] => 1
[3] => 2
[4] => 3 [4] => 4
[0] => 0
[1] => 1
[2] => 2
[3] => 3 [3] => 4 [2] => 3 [1] => 2 [2] => 4 [0] => 1 [1] => 3 [1] => 4 [0] => 2 [0] => 3 [0] => 4
Main thread exiting.
Конечно, потоки выполнения могут решать гораздо более сложные задачи, чем простой подсчет. Более практичный пример использования глобальных данных мы рассмотрим в разделе «Добавляем пользовательский интерфейс» в главе 13, где они будут играть роль сигналов главному потоку, управляющему графическим интерфейсом на основе библиотеки tkinter, о завершении дочерним потоком передачи данных по сети, а также в главе 10, в примере реализации модуля threadtools, и в главе 14, в примере приложения PyMailGUI, для отображения результатов отправки электронной почты в графическом интерфейсе (дополнительные указания по этой теме вы найдете в разделе «Графические интерфейсы и потоки выполнения: предварительное знакомство» ниже, в этой главе). Возможность совместного доступа к глобальным данным из потоков выполнения также является основой организации очередей, которые обсуждаются далее в главе, - каждый поток выполнения может извлекать или добавлять данные, используя один и тот же общий объект очереди.
Модуль threading
В стандартную библиотеку Python входят два модуля для работы с потоками: _thread - основной низкоуровневый интерфейс, который демонстрировался до сих пор, и threading - интерфейс более высокого уровня, основанный на объектах и классах. Внутри модуль threading использует модуль _thread для реализации объектов, представляющих потоки и инструменты синхронизации. Он в какой-то мере основан на подмножестве модели потоков выполнения языка Java, но есть различия, которые заметят только программисты Java.23 В примере 5.11 приводится еще одна, последняя версия нашего сценария с потоками-счетчиками, демонстрирующая интерфейсы этого нового модуля.
Пример 5.11. PP4E\System\Threads\thread-classes.py
экземпляры класса Thread, сохраняющие информацию о состоянии и обладающие методом run() для запуска потоков выполнения; в реализации используется высокоуровневый и Java-подобный метод join класса Thread модуля threading (вместо мьютексов и глобальных переменных), чтобы известить главный родительский поток о завершении дочерних потоков; подробности о модуле threading ищите в руководстве по стандартной библиотеке;
import threading
class Mythread(threading.Thread): # подкласс класса Thread
def __init__(self, myId, count, mutex):
self.myId = myId
self.count = count # информация для каждого потока
self.mutex = mutex # совместно используемые объекты,
threading.Thread.__init__(self) # вместо глобальных переменных
def run(self): # run реализует логику потока
for i in range(self.count): # синхронизировать доступ к stdout
with self.mutex:
print(‘[%s] => %s’ % (self.myId, i))
stdoutmutex = threading.Lock() # то же, что и thread.allocate_lock()
threads = []
for i in range(10):
thread = Mythread(i, 100, stdoutmutex) # создать/запустить 10 потоков thread.start() # вызвать метод run потока
threads.append(thread)
for thread in threads:
thread.join() # ждать завершения потока
print(‘Main thread exiting.’)
Этот сценарий производит точно такой же вывод, как и его предшественники (и снова строки случайно распределены по времени, в зависимости от используемой платформы):
C:\...\PP4E\System\Threads> python thread-classes.py ...часть вывода удалена...
[4] => 98
[8] => 97
[9] => 97
[5] => 98
[3] => 99
[6] => 98
[7] => 98
[4] => 99
[8] => 98
[9] => 98
[5] => 99
[6] => 99
[7] => 99
[8] => 99
[9] => 99
Main thread exiting.
Использование модуля threading заключается в основном в определении новых классов. Потоки в этом модуле реализуются с помощью объекта Thread - класса Python, который наследуется и специализируется в каждом приложении путем реализации метода run, определяющего действия, выполняемые потоком. Например, в данном сценарии создается подкласс Mythread класса Thread, метод run которого будет вызываться родительским классом Thread в новом потоке после создания экземпляра класса Mythread и вызова его метода start.
Иными словами, этот сценарий просто обеспечивает методы, предполагаемые структурой класса Thread. Преимущество этого приема, требующего создания большего объема программного кода, заключается в том, что он обеспечивает «бесплатный» доступ к информации о состоянии каждого потока в отдельности (в виде атрибутов экземпляра) и к ряду дополнительных инструментов для работы с потоками, предоставляемых данной структурой. К примеру, используемый в конце сценария метод Thread.join ожидает завершения (по умолчанию) потока выполнения - этот метод можно использовать, чтобы предотвратить завершение главного потока до того, как завершится дочерний поток, и отказаться от вызова функции time.sleep, глобальных блокировок и переменных, использовавшихся в предыдущих примерах с потоками.
Кроме того, для синхронизации доступа к стандартному потоку вывода в примере 5.11 используется конструктор threading.Lock (хотя в текущей реализации это просто синоним конструктора _thread.allocate_lock). Модуль threading предоставляет и другие структуры классов, но они не влияют на общую картину многопоточной модели параллельной обработки данных.
Другие способы реализации потоков выполнения с помощью модуля threading
Класс Thread можно также использовать для запуска простых функций и вызываемых объектов других типов, вообще не создавая подклассы. Метод run класса Thread по умолчанию просто вызывает объект, переданный конструктору в аргументе target, со всеми дополнительными аргументами, переданными в аргументе args (который по умолчанию является пустым списком ()). Это позволяет использовать класс Thread для запуска простых функций, хотя такая форма вызова ненамного проще использования модуля _thread. Например, в следующих фрагментах демонстрируются четыре различных способа запуска одного и того же потока (смотрите сценарии four-threads*.py в дереве примеров; вы можете запустить все четыре потока в одном сценарии, но при этом вам понадобится синхронизировать обращения к функции print, чтобы избежать смешивания выводимых данных):
import threading, _thread def action(i): print(i ** 32)
# подкласс, хранящий собственную информацию о состоянии class Mythread(threading.Thread):
def __init__(self, i): self.i = i
threading.Thread.__init__(self)
def run(self): # переопределить метод run
print(self.i ** 32)
Mythread(2).start() # метод start вызовет метод run()
# передача простой функции
thread = threading.Thread(target=(lambda: action(2))) # run вызовет target thread.start()
# то же самое, но без lambda-функции,
# сохраняющей информацию о состоянии в образуемом ею замыкании threading.Thread(target=action, args=(2,)).start() # вызываемый объект
# и его аргументы
# с помощью модуля thread
_thread.start_new_thread(action, (2,)) # полностью процедурный интерфейс
Как правило, выбирать реализацию потоков на основе классов имеет смысл, когда потоки должны сохранять информацию о своем состоянии или когда желательно использовать какие-либо из многочисленных преимуществ ООП. Однако классы потоков выполнения необязательно должны наследовать класс Thread. Фактически, как и при использовании модуля _thread, реализация потоков в модуле threading может принимать в аргументе target вызываемые объекты любого типа. При объединении с такими приемами, как связанные методы и вложенные области видимости, различия между приемами программирования становятся еще менее выраженными:
# обычный класс с атрибутами, ООП class Power:
def __init__(self, i): self.i = i def action(self):
print(self.i ** 32)
obj = Power(2)
threading.Thread(target=obj.action).start() # запуск связанного метода
# вложенная область видимости, для сохранения информации о состоянии def action(i):
def power():
print(i ** 32) return power
threading.Thread(target=action(2)).start() # запуск возвращаемой функции
# запуск обоих вариантов с помощью модуля _thread
_thread.start_new_thread(obj.action, ()) # запуск вызываемого объекта
_thread.start_new_thread(action(2), ())
Как видите, интерфейс модуля threading такой же гибкий, как и сам язык Python.
Еще раз о синхронизации доступа к совместно используемым объектам и переменным
Выше мы видели, что вызовы функции print в потоках выполнения необходимо синхронизировать с помощью блокировок, чтобы избежать смешивания выводимых данных, потому что стандартный поток вывода совместно используется всеми потоками выполнения. Строго говоря, потоки выполнения должны синхронизировать любые операции изменения совместно используемых объектов и переменных. В зависимости от целей программы в число этих объектов могут входить:
• Изменяемые объекты в памяти (объекты, ссылки на которые передаются потокам или приобретаются каким-то иным способом, продолжительность существования которых превышает время работы потоков)
• Переменные в глобальной области видимости (изменяемые переменные, объявленные за пределами функций и классов, выполняемых в потоках)
• Содержимое модулей (для каждого модуля существует всего одна копия записи в системной таблице модулей)
Даже при работе с простыми глобальными переменными может потребоваться координация действий, если есть вероятность одновременных попыток их изменения, как показано в примере 5.12.
Пример 5.12. PP4E\System\Threads\thread-add-random.py
"выводит различные результаты при каждом запуске под Windows 7”
import threading, time count = 0
def adder(): global count
count = count + 1 # изменяет глобальную переменную
time.sleep(0.5) # потоки выполнения совместно используют count = count + 1 # глобальные объекты и переменные
threads = []
for i in range(100):
thread = threading.Thread(target=adder, args=())
thread.start()
threads.append(thread)
for thread in threads: thread.join() print(count)
Этот пример порождает 100 потоков выполнения, каждый из которых дважды изменяет одну и ту же глобальную переменную (с задержкой между ними, чтобы обеспечить чередование операций в различных потоках). При каждом запуске в Windows 7 этот сценарий будет воспроизводить различные результаты:
C:\...\PP4E\System\Threads> thread-add-random.py
189
C:\...\PP4E\System\Threads> thread-add-random.py
200
C:\...\PP4E\System\Threads> thread-add-random.py
194
C:\...\PP4E\System\Threads> thread-add-random.py
191
Это объясняется тем, что потоки выполнения произвольно перекрываются друг с другом по времени: интерпретатор не гарантирует, что инструкции - даже такие простые инструкции присваивания, как в данном примере, - будут выполнены полностью до того, как управление перейдет другому потоку выполнения (то есть они не являются атомарными). Когда один поток изменяет значение глобальной переменной, он может получить промежуточный результат, произведенный другим потоком. Как следствие этого мы наблюдаем непредсказуемое поведение. Чтобы заставить этот сценарий работать корректно, необходимо снова воспользоваться блокировками для синхронизации изменений - в какой бы момент мы ни запускали сценарий из примера 5.13, он всегда будет выводить число 200.
Пример 5.13. PP4E\System\Threads\thread-add-synch.py
"всегда выводит 200 - благодаря синхронизации доступа к глобальному ресурсу”
import threading, time count = 0
def adder(addlock): # совместно используемый объект блокировки
global count
with addlock: # блокировка приобретается/освобождается
count = count + 1 # автоматически time.sleep(0.5)
with addlock: # в каждый конкретный момент времени
count = count + 1 # только 1 поток может изменить значение переменной
addlock = threading.Lock()
threads = []
for i in range(100):
thread = threading.Thread(target=adder, args=(addlock,))
thread.start()
threads.append(thread)
for thread in threads: thread.join() print(count)
Некоторые простейшие операции в языке Python являются атомарными и не требуют синхронизации, тем не менее лучше все-таки предусматривать координацию потоков выполнения, если есть вероятность одновременных попыток изменения значения. Со временем может измениться не только набор атомарных операций, но и внутренняя реализация механизма потоков выполнения (такие изменения ожидаются в версии Python 3.2, как описывается далее).
Конечно, это во многом искусственный пример (порождать 100 потоков выполнения, чтобы в каждом из них дважды увеличить счетчик, - это определенно не самый практичный случай использования потоков!), но он наглядно иллюстрирует проблемы, с которыми можно столкнуться, когда существует вероятность параллельного изменения объекта или переменной, совместно используемой потоками. К счастью, для многих, если не для большинства применений, модуль queue, описываемый в следующем разделе, способен обеспечить автоматическую синхронизацию потоков выполнения.
Прежде чем двинуться дальше, я должен отметить, что помимо классов Thread и Lock в модуле threading имеются и другие высокоуровневые инструменты синхронизации доступа к совместно используемым объектам (например, Semaphore, Condition, Event) - много больше, чем позволяет вместить объем этой книги, поэтому за дополнительными подробностями обращайтесь к руководству по библиотеке. Дополнительные примеры использования потоков выполнения и дочерних процессов вы найдете в оставшейся части этой главы, а также среди примеров в разделах книги, посвященных реализации графического интерфейса и сетевых взаимодействий. В графических интерфейсах, например, мы будем использовать потоки, чтобы избежать их блокирования. Мы также будем порождать потоки и дочерние процессы в сетевых серверах, чтобы исключить вероятность отказа в обслуживании клиентов.
Кроме того, мы будем исследовать приемы использования модуля threading для завершения программы без применения метода join, но в соединении с очередями - которые являются темой следующего раздела.
Модуль queue
Синхронизировать доступ потоков выполнения к совместно используемым ресурсам можно с помощью блокировок, но часто в этом нет необходимости. Как уже упоминалось выше, на практике многопоточные программы часто организуются, как набор потоков производителей и потребителей, которые взаимодействуют между собой, помещая данные в общую очередь и извлекая их оттуда. Если очередь синхронизирует доступ к самой себе, она автоматически будет синхронизировать взаимодействия потоков выполнения.
Как раз такое хранилище данных реализует модуль queue из стандартной библиотеки. Он предоставляет стандартную очередь данных -список объектов Python, построенный по принципу «первый пришел, первый ушел» (first-in first-out, fifo), в котором добавление элементов производится с одного конца, а удаление - с другого. Подобно обычным спискам, очереди, реализуемые этим модулем, могут содержать объекты любых типов, включая объекты простых типов (строки, списки, словари и так далее) и более экзотических типов (экземпляры классов, произвольные вызываемые объекты, такие как функции и связанные методы, и многие другие).
Однако, в отличие от обычных списков, объект очереди автоматически управляет операциями приобретения и освобождения блокировки, благодаря чему в каждый конкретный момент времени изменять очередь может только один поток. Вследствие этого программы, использующие очереди для организации взаимодействий между потоками, изначально обеспечивают поддержку многопоточной модели выполнения и обычно не используют свои собственные блокировки для доступа к данным из потоков выполнения.
Подобно другим инструментам из арсенала поддержки потоков выполнения в языке Python, очереди удивительно просты в использовании. Так, сценарий в примере 5.14 порождает два потока-потребителя, которые ожидают появления данных в общей очереди, и четыре потока-производителя, периодически, через определенные интервалы времени, помещающие данные в очередь (для каждого из них установлена своя продолжительность интервала, чтобы имитировать выполнение длительных операций). Другими словами, эта программа запускает семь потоков выполнения (включая главный поток), шесть из которых обращаются к общей очереди параллельно.
Пример 5.14. PP4E\System\Threads\queuetest.py
"взаимодействие потоков производителей и потребителей посредством очереди”
numconsumers = 2 # количество потоков-потребителей
numproducers = 4 # количество потоков-производителей
nummessages = 4 # количество сообщений, помещаемых производителем
import _thread as thread, queue, time
safeprint = thread.allocate_lock() # в противном случае вывод может
# перемешиваться
dataQueue = queue.Queue() # общая очередь неограниченного размера
def producer(idnum):
for msgnum in range(nummessages): time.sleep(idnum)
dataQueue.put(‘[producer id=%d, count=%d]’ % (idnum, msgnum))
def consumer(idnum): while True:
time.sleep(0.1)
try:
data = dataQueue.get(block=False) except queue.Empty: pass else:
with safeprint:
print(‘consumer’, idnum, ‘got =>’, data)
if __name__ == ‘__main__’:
for i in range(numconsumers):
thread.start_new_thread(consumer, (i,)) for i in range(numproducers):
thread.start_new_thread(producer, (i,)) time.sleep(((numproducers-1) * nummessages) + 1) print(‘Main thread exit.’)
Прежде чем я покажу вывод этого сценария, я хочу подчеркнуть некоторые важные моменты в этом программном коде.
Аргумент или глобальная переменная?
Обратите внимание, что ссылка на очередь сохраняется в глобальной переменной. Благодаря этому очередь может использоваться всеми порожденными потоками выполнения (все они выполняются в одном процессе и в одном глобальном пространстве имен). Потоки изменяют сам объект очереди, а не ссылку в переменной, поэтому они точно так же могли бы работать с очередью, если бы она передавалась, как аргумент функции, выполняемой в потоке. Очередь является совместно используемым объектом в памяти, и неважно, каким способом поток обретет ссылку на него (полную версию сценария, фрагмент которого представлен ниже, вы найдете в файле queuetest2.py, в дереве примеров):
dataQueue = queue.Queue() # общий объект, неограниченный размер
def producer(idnum, dataqueue):
for msgnum in range(nummessages): time.sleep(idnum)
dataqueue.put(‘[producer id=%d, count=%d]’ % (idnum, msgnum))
def consumer(idnum, dataqueue): ...
if __name__ == ‘__main__’:
for i in range(numproducers):
thread.start_new_thread(producer, (i, dataQueue)) for i in range(numproducers):
thread.start_new_thread(producer, (i, dataQueue))
Завершение программы с дочерними потоками выполнения
Обратите также внимание, что сценарий завершает свою работу вместе с завершением главного потока, при том, что потоки-потребители продолжают выполнять свой бесконечный цикл. Этот прием прекрасно действует в Windows (и в большинстве других систем) - при использовании модуля _thread программа просто завершает свою работу вместе с главным потоком. Именно поэтому мы использовали функцию sleep в некоторых примерах - чтобы дать дочерним потокам возможность завершить свою работу, и именно поэтому нам нет необходимости беспокоиться о завершении потоков-потребителей, которые в данном примере выполняются в бесконечном цикле.
Однако при использовании альтернативного модуля threading программа не может завершиться, когда хотя бы один поток продолжает работу, если только он не был запущен, как поток-демон. В частности, программа завершается, когда в ней остаются только потоки-демоны. При создании потоки наследуют признак принадлежности к потокам-демонам от потока, породившего их. Главный поток в программах на языке Python не может быть демоном, тогда как потоки, созданные без помощи этого модуля, считаются демонами (включая некоторые потоки, создаваемые расширениями на языке C). Чтобы переопределить признак, унаследованный по умолчанию, можно вручную установить атрибут daemon объекта потока. Другими словами, потоки, не относящиеся к потокам-демонам, препятствуют завершению программы, и программы продолжают работать, пока не завершатся все потоки, созданные под управлением модуля threading.
Эту особенность можно рассматривать как достоинство или как недостаток, в зависимости от потребностей программы, - с одной стороны, когда не используется метод join, или когда главный поток не приостанавливается на некоторое время, она может принудительно завершать рабочие потоки; с другой стороны, она может препятствовать завершению программы, как показано в примере 5.14. Чтобы этот пример мог работать при использовании модуля threading, используйте следующее альтернативное решение (смотрите полную версию в файле queuetest3. py в дереве примеров, а также сценарий thread-countthreading.py - в качестве демонстрации того, где может пригодиться препятствование завершению):
import threading, queue, time
def producer(idnum, dataqueue): ...
def consumer(idnum, dataqueue): ...
if __name__ == ‘__main__’:
for i in range(numconsumers):
thread = threading.Thread(target=consumer, args=(i, dataQueue)) thread.daemon = True # иначе программа не завершится! thread.start()
waitfor = []
for i in range(numproducers):
thread = threading.Thread(target=producer, args=(i, dataQueue))
waitfor.append(thread)
thread.start()
for thread in waitfor: thread.join() # или большое значение в time.sleep() print(‘Main thread exit.’)
Мы еще вернемся к потокам-демонам и к проблеме завершения потоков в главе 10, когда будем изучать особенности реализации графических интерфейсов. Как мы увидим, в том контексте все происходит точно так же, за исключением того, что там главный поток обычно занимается обслуживанием графического интерфейса.
Запуск сценария
Теперь вернемся к примеру 5.14. Ниже приводится вывод этого примера после запуска на моем компьютере под управлением Windows. Обратите внимание, что несмотря на автоматическую координацию обмена данными между потоками с помощью очереди, в этом сценарии по-прежнему необходимо использовать блокировку для синхронизации доступа к стандартному потоку вывода - очередь синхронизирует обмен данными, но в некоторых программах все равно может потребоваться использовать блокировки для других целей. Как было показано в предыдущих примерах, если не использовать блокировку safeprint, вывод от разных потоков-потребителей может перемешиваться, поскольку есть вероятность, что поток-потребитель будет приостановлен в процессе выполнения операции вывода:
C:\...\PP4E\System\Threads> queuetest.py consumer 1 got => [producer id=0, count=0] consumer 0 got => [producer id=0, count=1] consumer 1 got => [producer id=0, count=2] consumer 0 got => [producer id=0, count=3] consumer 1 got => [producer id=1, count=0] consumer 1 got => [producer id=2, count=0] consumer 0 got => [producer id=1, count=1] consumer 1 got => [producer id=3, count=0] consumer 0 got => [producer id=1, count=2] consumer 1 got => [producer id=2, count=1] consumer 1 got => [producer id=1, count=3] consumer 1 got => [producer id=3, count=1] consumer 0 got => [producer id=2, count=2] consumer 1 got => [producer id=2, count=3] consumer 1 got => [producer id=3, count=2] consumer 1 got => [producer id=3, count=3]
Main thread exit.
Попробуйте поэкспериментировать с параметрами в начале этого сценария. Единственный потребитель, например, мог бы имитировать главный поток графического интерфейса. Ниже представлен вывод сценария с единственным потребителем - потоки-производители по-прежнему добавляют данные в очередь в достаточно случайном порядке, потому что потоки выполняются параллельно друг с другом и с потоком-потребителем:
C:\...\PP4E\System\Threads> queuetest.py consumer 0 got => [producer id=0, count=0] consumer 0 got => [producer id=0, count=1] consumer 0 got => [producer id=0, count=2] consumer 0 got => [producer id=0, count=3]
consumer
0
got =>
[producer
id=1, count=0]
consumer
0
got =>
[producer
id=2, count=0]
consumer
0
got =>
[producer
id=1, count=1]
consumer
0
got =>
[producer
id=3, count=0]
consumer
0
got =>
[producer
id=1, count=2]
consumer
0
got =>
[producer
id=2, count=1]
consumer
0
got =>
[producer
id=1, count=3]
consumer
0
got =>
[producer
id=3, count=1]
consumer
0
got =>
[producer
id=2, count=2]
consumer
0
got =>
[producer
id=2, count=3]
consumer
0
got =>
[producer
id=3, count=2]
consumer
0
got =>
[producer
id=3, count=3]
Main thread exit.
Кроме основных особенностей очередей, продемонстрированных в этом сценарии, очереди могут иметь фиксированный или неограниченный размер, а методы get и put могут блокировать или не блокировать вызывающий поток - подробное описание интерфейса очередей вы найдете в руководстве по стандартной библиотеке Python. Однако, поскольку мы только что попробовали сымитировать типичную структуру сценария с графическим интерфейсом, продолжим исследование этого понятия дальше.
Графические интерфейсы и потоки выполнения: предварительное знакомство
Мы еще вернемся к потокам выполнения и очередям и рассмотрим дополнительные примеры их использования, когда позже будем изучать приемы создания графических интерфейсов. В примере приложения PyMailGUI, представленном в главе 14, широко будут использоваться инструменты управления потоками выполнения и очередями, представленными здесь. В главах 10 и 9 будут обсуждаться особенности использования многопоточной модели выполнения в контексте библиотеки tkinter инструментов для построения графического интерфейса, - как только мы познакомимся с ней поближе. В этом разделе мы не будем погружаться в программный код, но отметим, что потоки выполнения обычно являются неотъемлемой частью большинства нетривиальных графических интерфейсов. Модель функционирования многих графических интерфейсов представляет собой комбинацию потоков выполнения, очередей и циклов, выполняемых по таймеру.
И вот почему. В контексте графического интерфейса любая операция, выполнение которой может быть заблокировано или занять продолжительное время, должна запускаться в параллельном потоке, чтобы графический интерфейс (главный поток) оставался активным и продолжал откликаться на действия пользователя. Подобные операции можно было бы запускать в виде отдельных процессов, однако эффективность потоков выполнения и поддерживаемая ими возможность совместного использования памяти процесса делает их идеальным инструментом для решения подобных задач. Кроме того, так как большая часть инструментов создания графических интерфейсов не позволяет обновлять интерфейс сразу из нескольких потоков, то внесение изменений в графический интерфейс будет ограничено главным потоком.
Так как только главный поток должен в общем случае изменять интерфейс, программы с графическим интерфейсом обычно принимают следующий вид: главный поток, который обслуживает интерфейс, и один или несколько долгоживущих потоков-производителей - по одному потоку для каждой задачи. Для синхронизации потоков все глобальные данные передаются между ними с помощью глобальных очередей: рабочие потоки посылают результаты, а поток, управляющий графическим интерфейсом, потребляет их.
Если говорить более определенно:
• Главный поток выполняет все изменения в графическом интерфейсе и запускает цикл, выполняющийся по таймеру, который выполняет периодическую проверку наличия новых данных в очереди для отображения на экране. В библиотеке tkinter, например, для периодической проверки очереди можно использовать метод after(msecs, func, *args). Так как такие события распространяются процессором событий графического интерфейса, все изменения в интерфейсе будут выполняться только в главном потоке (и часто это является обязательным требованием, из-за того что инструменты создания графических интерфейсов редко поддерживают многопоточную модель выполнения).
• Дочерние потоки вообще не выполняют операций с графическим интерфейсом. Они лишь производят данные и помещают их в очередь, откуда эти данные будут извлекаться главным потоком. При необходимости дочерние потоки могут помещать в очередь функции обратного вызова, которые будут вызываться главным потоком. В программах с графическим интерфейсом обычно недостаточно просто передать функцию обратного вызова, изменяющую интерфейс, из главного потока в дочерний и вызывать ее оттуда - функция будет выполняться в дочернем потоке и, возможно, параллельно с другими потоками.
Так как потоки обеспечивают более высокую отзывчивость графического интерфейса, чем цикл на основе таймера, такая организация приложения позволяет избежать блокирования интерфейса (потоки-производители работают параллельно с графическим интерфейсом) и не терять входящие события (потоки-производители действуют независимо от цикла событий графического интерфейса и выполняются с максимальной скоростью). Главный поток, обслуживающий графический интерфейс, будет отображать результаты из очереди так быстро, как только это возможно в контексте более медленного цикла событий графического интерфейса.
Кроме того, имейте в виду, что, независимо от наличия поддержки многопоточной модели выполнения в инструменте создания графического интерфейса, многопоточные программы с графическим интерфейсом по-прежнему должны придерживаться общих принципов построения многопоточных программ - может потребоваться синхронизировать доступ к общим ресурсам, если он выходит за рамки модели с очередью, совместно используемой производителями/потребителями. Если дочерние потоки должны изменять другие общие ресурсы, используемые главным потоком графического интерфейса, может потребоваться использовать блокировки, чтобы избежать их взаимовлияния. Например, дочерние потоки, загружающие и кэширующие сообщения электронной почты, не должны перекрываться по времени с другими потоками, использующими или изменяющими содержимое кэша. То есть одних только очередей может оказаться недостаточно. Если обязанность потоков не сводится к размещению своих данных в очередь, то многопоточные приложения с графическим интерфейсом должны учитывать проблемы, сопутствующие параллельной обработке данных.
Далее мы увидим, как можно реализовать многопоточную модель в графическом интерфейсе. Дополнительную информацию по этой теме вы найдете в дискуссии, посвященной использованию потоков выполнения при работе с инструментом tkinter создания графических интерфейсов в главе 9, в примерах реализации инструментов для работы с потоками выполнения и очередями в главе 10 и в примере приложения PyMailGUI в главе 14.
Далее в этой главе мы также встретимся с пакетом multiprocessing, поддержка процессов и очередей в котором предоставляет новые возможности реализации модели графического интерфейса, где вместо потоков выполнения используются процессы. Эта модель позволяет обойти ограничение GIL, но ее применение может отрицательно сказываться на производительности, в зависимости от платформы, и может оказаться вообще неприменимой в контексте потоков (эта модель не поддерживает прямой доступ к общим изменяемым объектам, хранящим информацию о состоянии потоков выполнения, однако она поддерживает механизм обмена сообщениями). Но сначала рассмотрим несколько интересных моментов, касающихся потоков.
Таймеры-потоки против таймеров графического интерфейса
Интересно отметить, что модуль threading экспортирует универсальную функцию Timer, которая, как и метод after виджетов в библиотеке tkinter, может использоваться для запуска другой функции по истечении указанного интервала времени:
Timer(N.M, somefunc).start() # вызовет функцию somefunc через N.M секунд
Объекты-таймеры имеют метод start(), запускающий таймер, а также метод cancel(), позволяющий отменить запланированное событие, а кроме того, ожидание в них реализовано в виде отдельного потока выполнения. Например, следующий пример выведет сообщение спустя 5.5 секунд:
>>> import sys
>>> from threading import Timer
>>> t = Timer(5.5, lambda: print('Spam!')) # дочерний поток >>> t.start()
>>> Spam!
Этот инструмент может пригодиться в самых разных ситуациях, но он не должен использоваться в графических интерфейсах: отложенная функция будет вызвана в контексте дочернего потока, а не в главном потоке графического интерфейса, поэтому она не должна выполнять изменения в графическом интерфейсе. Метод after из библиотеки tkinter, напротив, вызывается из цикла обработки событий главного потока выполнения и запускает указанную функцию в главном потоке, поэтому она свободно может изменять графический интерфейс.
Например, следующий пример выведет окно диалога через 5.5 секунд в контексте главного потока инструмента tkinter (в некоторых интерфейсах вам может также потребоваться запустить win. main loop()):
>>> from tkinter import Tk
>>> from tkinter.messagebox import showinfo
>>> win = Tk()
>>> win.after(5500, lambda: showinfo('Popup', 'Spam!'))
В последней строке здесь планируется однократный запуск функции в главном потоке выполнения графического интерфейса, но он не приостанавливает работу вызывающей программы и поэтому не блокирует графический интерфейс. Ниже приводится эквивалентная и более простая форма:
>>> win.after(5500, showinfo, 'Popup', 'Spam')
В следующей части книги, в главе 9, подробнее будет рассказываться о библиотеке tkinter и о методе after, а в главе 10 - о роли потоков выполнения в приложениях с графическим интерфейсом.
Подробнее о глобальной блокировке интерпретатора (GIL)
Эта тема относится к области низкоуровневого программирования, и во многих случаях можно и без этих знаний успешно организовать многопоточную работу в программах на языке Python, но тем не менее реализация механизма потоков в Python может оказывать влияние как на производительность, так и на стиль программирования. В этом разделе приводятся сведения об особенностях реализации и о некоторых их следствиях.
О реализации потоков выполнения в грядущей версии Python 3.2: В этом разделе описывается текущая реализация потоков выполнения, включая версию Python 3.1. К моменту написания этих строк версия Python 3.2 все еще находилась в стадии разработки, но одним из нововведений в ней наверняка будет новая версия GIL, обеспечивающая более высокую производительность, особенно в системах с многоядерными процессорами. Новая реализация GIL по-прежнему будет синхронизировать доступ к PVM (программный код на языке Python по-прежнему будет мультиплексироваться, как и ранее), но она будет использовать более эффективную схему переключения контекста, чем ныне используемая схема переключения через N-инструкций-в-байткоде.
Помимо всего прочего, текущая функция sys.setcheckinterval, вероятно, будет заменена таймером с поддержкой новой схемы. В частности, понятие интервала проверки необходимости переключения потоков выполнения будет ликвидировано и заменено абсолютным значением продолжительности, выраженным в секундах. Как ожидается, это значение по умолчанию будет равно 5 миллисекундам, но его можно будет изменять с помощью функции sys. setswitchinterval.
Кроме того, существовало множество планов по полной ликвидации GIL (включая проект Unladen Swallow, запущенный сотрудниками Google), однако до сих пор не было представлено ни одного варианта решения. Я не берусь предсказывать будущее, поэтому читайте документацию к новым версиям Python, чтобы оставаться в курсе.
Строго говоря, в настоящее время Python использует механизм глобальной блокировки интерпретатора (Global Interpreter Lock, GIL), представленный в начале этого раздела и обеспечивающий выполнение интерпретатором Python в каждый конкретный момент времени программного кода не более чем одного потока. Кроме того, чтобы дать каждому потоку возможность поработать, интерпретатор автоматически переключается между ними через равные промежутки времени (в Python 3.1 - путем освобождения и приобретения блокировки после выполнения некоторого числа инструкций в байт-коде), а также в начале длительных операций (например, в начале операций ввода/вывода в файлы).
Такая схема позволяет избежать проблем, могущих возникнуть, когда нескольким потокам выполнения одновременно может потребоваться обновить системные данные Python. Например, если двум потокам разрешить одновременно изменить счетчик ссылок на объект, результаты могут оказаться непредсказуемыми. Применение этой схемы может также повлечь тонкие неочевидные последствия. В частности, в примерах использования потоков, приведенных в данной главе, мы видели, что при выводе в stdout текст может повреждаться, если потоки не синхронизируют свои операции вывода с помощью блокировок.
Кроме того, хотя глобальная блокировка интерпретатора не разрешает одновременно выполнять более одного потока Python, этого недостаточно для обеспечения безопасности потоков в целом и это никак не решает проблемы синхронизации на более высоком уровне. Например, если одновременно несколько потоков пытаются изменить одну и ту же переменную, им обычно должен предоставляться исключительный доступ к объекту с помощью блокировок. В противном случае существует вероятность, что переключение потоков произойдет посреди байт-кода выражения, осуществляющего изменение.
Не обязательно использовать блокировки для обращения ко всем совместно используемым объектам, особенно если только один поток изменяет объект, а остальные только наблюдают за изменениями. Возьмите за правило всегда использовать блокировки для синхронизации потоков в тех случаях, когда возможна конкуренция в операции изменения, не полагаясь на текущий способ реализации потоков.
Интервал переключения потоков выполнения
В некоторых случаях параллельные изменения могут выполняться корректно и без применения блокировок, если сделать интервал переключения потоков настолько большим, чтобы каждый из потоков мог выполниться прежде чем будет переключен. Функция sys.setcheckinterval(N) устанавливает частоту, с которой интерпретатор будет выполнять такие операции, как переключение потоков и обработка сигналов.
Этот интервал измеряется в инструкциях байт-кода, выполняемых между переключениями. В большинстве программ не требуется изменять эту частоту, но с ее помощью можно регулировать производительность работы потоков. Установка более высоких значений приводит к тому, что переключение происходит реже: уменьшатся накладные расходы на переключение потоков, но потоки медленнее будут реагировать на события. Установка более низких значений обеспечит более высокую отзывчивость потоков на события, но увеличит накладные расходы на их переключение.
Атомарные операции
Из-за того, каким образом интерпретатор Python использует GIL для синхронизации доступа к виртуальной машине, ни для каких инструкций высокого уровня не гарантируется выполнение инструкции до конца до переключения на другой поток, но оно гарантируется для всех инструкций в байт-коде. Инструкции в байт-коде являются неделимыми, поэтому некоторые операции в языке Python обеспечивают безопасную работу с потоками. Такие операции называются атомарными, потому что их выполнение не может быть прервано - и при их использовании не требуется задействовать блокировки или очереди, чтобы избежать проблем, связанных с одновременными изменениями. Например, к моменту написания этих строк в стандартной реализации C Python выполняются атомарно: метод list.append, операции извлечения и некоторые операции присваивания значений переменным, обращение к элементам списков, ключам словарей и атрибутам объектов, а также некоторые другие операции. Другие операции, такие как x = x+1 (и вообще любые операции, при выполнении которых происходит чтение данных, их изменение и запись обратно), - нет.
Однако, как уже отмечалось выше, не следует полагаться на эти особенности, потому что они требуют глубокого понимания внутренней реализации интерпретатора и могут изменяться от версии к версии. На самом деле, набор атомарных операций может существенно измениться с введением более свободной от ограничений реализации потоков. На практике проще использовать блокировки для доступа ко всем глобальным переменным и общим объектам, чем пытаться запомнить, какие операции могут или не могут быть безопасными при одновременном использовании их в нескольких потоках выполнения.
Прикладной интерфейс потоков на языке C
Наконец, если вы собираетесь использовать смешанный программный код на языках Python и C, посмотрите также интерфейсы потоков, описываемые в стандартном руководстве по API Python/C. В многопоточных программах расширения на языке C должны освобождать и снова приобретать глобальную блокировку интерпретатора при выполнении длительных операций, чтобы позволить выполняться другим потокам Python. В частности, функции в расширениях на языке C, выполняющие продолжительные операции, должны освобождать блокировку на входе и приобретать на выходе, чтобы возобновить работу программного кода Python.
Обратите внимание: хотя программный код Python в разных потоках Python не может выполняться одновременно из-за синхронизации с помощью GIL, тем не менее фрагменты потоков с программным кодом на языке C такую возможность имеют. Параллельно может выполняться любое число потоков, при условии, что они действуют за пределами виртуальной машины Python. Потоки на языке C могут перекрываться во времени и с другими потоками на языке C, и с потоками Python, выполняемыми виртуальной машиной. Благодаря этому разделение программного кода по библиотекам на языке C может применяться в приложениях Python для использования преимуществ многопроцессорных систем.
Однако часто бывает проще использовать преимущества многопроцессорных систем за счет создания программ на языке Python, которые вместо потоков запускают параллельные процессы. Сложность программного кода, управляющего потоками и процессами, примерно одинаковая. Подробнее о расширениях на языке C и их требованиях к многопоточной модели выполнения рассказывается в главе 20. Тем не менее коротко отмечу, что в состав Python входят инструменты на языке C (среди них пара макроопределений для управления GIL), которые могут использоваться для обертывания продолжительных операций в программном коде расширений на языке C и позволяют параллельно выполняться другим потокам в программном коде на языке Python.
Альтернатива на основе процессов: пакет multiprocessing (описывается далее)
К настоящему моменту у вас должно сложиться общее представление о параллельно выполняющихся процессах и потоках, а также об инструментах в языке Python для управления ими. Далее в главе мы вернемся к этим идеям, когда будем знакомиться с пакетом multiprocessing - инструментом из стандартной библиотеки, объединяющим в себе простоту и переносимость потоков с преимуществами процессов, - за счет реализации прикладного интерфейса, напоминающего потоки, который вместо потоков запускает процессы. Он стремится решить проблемы переносимости поддержки процессов и ограничений на использование преимуществ многопроцессорных систем, накладываемых блокировкой GIL. Но в некоторых ситуациях он не может использоваться как замена приему ветвления процессов и накладывает ряд ограничений, которые отсутствуют при работе с потоками, проистекающих из особенностей модели процессов (например, изменяемые объекты не могут использоваться непосредственно, потому что их приходится копировать через границы процессов, а объекты, не поддерживающие возможность сериализации, такие как связанные методы, вообще не могут использоваться).
Пакет multiprocessing реализует набор инструментов, упрощающих такие задачи, как взаимодействие между процессами и передача кода завершения. Поэтому мы сначала исследуем поддержку этих возможностей в языке Python и попутно рассмотрим еще несколько примеров использования потоков выполнения и процессов.
Завершение программ
Как мы видели выше, в отличие от языка C, в Python нет функции «main». При запуске программы весь программный код верхнего уровня в файле (то есть в файле, имя которого указано в командной строке, на котором был выполнен щелчок в проводнике, и так далее) просто выполняется от начала и до конца. Обычно сценарии завершаются, когда интерпретатор достигает конца файла, но завершить программу можно и явно с помощью инструментов из модулей sys и os.
Завершение программ средствами модуля sys
Например, программу можно завершить раньше обычного, вызвав функцию sys.exit:
>>> sys.exit(N) # выход с кодом завершения N, в противном случае
# программа завершится по достижении конца сценария
Интересно отметить, что в действительности эта функция просто возбуждает встроенное исключение SystemExit. Поэтому его можно обычным образом перехватывать, чтобы выполнить завершающие действия. Если это исключение не перехватывать, интерпретатор завершит работу как обычно. Например:
C:\...\PP4E\System> python >>> import sys >>> try:
... sys.exit() # смотрите также: os._exit, Tk().quit()
... except SystemExit:
... print('ignoring exit')
ignoring exit
>>>
Некоторые программные инструменты, такие как отладчики, могут использовать эту особенность для предотвращения завершения программы. Фактически явное возбуждение встроенного исключения System-Exit с помощью инструкции raise эквивалентно вызову функции sys. exit. В практических сценариях в блоке try можно было бы перехватывать исключения завершения работы, возбуждаемые в любом месте программы. Сценарий в примере 5.15 завершается из выполняющейся функции.
Пример 5.15. PP4E\System\Exits\testexit_sys.py
def later(): import sys
print(‘Bye sys world’) sys.exit(42) print(‘Never reached’)
if __name__ == ‘__main__’: later()
Если запустить этот пример как самостоятельный сценарий, он завершится еще до того, как интерпретатор достигнет конца файла. Но поскольку функция sys.exit возбуждает исключение, в случае импортирования этой функции вызывающий программный код может перехватывать возбуждаемое исключение завершения и отменять его, либо предусматривать блок finally, который будет выполнен при завершении программы:
C:\...\PP4E\System\Exits> python testexit_sys.py
Bye sys world
C:\...\PP4E\System\Exits> python >>> from testexit_sys import later >>> try:
... later()
... except SystemExit:
... print('Ignored...')
Bye sys world Ignored...
>>> try:
... later()
... finally:
... print('Cleanup')
Bye sys world Cleanup
C:\...\PP4E\System\Exits> # процесс интерактивного сеанса завершился
Завершение программ средствами модуля os
Можно выйти из Python и другими способами. Например, в дочернем процессе в Unix обычно вызывается функция os._exit, а не sys.exit. Потоки можно завершать с помощью функции _thread.exit, а приложения с графическим интерфейсом на основе tkinter часто завершаются с помощью метода h<().quit (). С модулем tkinter мы познакомимся далее в этой книге, а сейчас поближе рассмотрим инструменты завершения программ в модуле os.
При вызове функции os._exit вызывающий процесс завершается сразу, не возбуждая исключения, которое можно перехватить и игнорировать. Фактически при таком завершении процесс прекращает работу, не выталкивая буферы потоков вывода и не вызывая обработчики, выполняющие заключительные операции (которые можно определить с помощью модуля atexit из стандартной библиотеки), поэтому в общем случае данная функция должна использоваться только дочерними процессами, когда не требуется выполнения действий по завершению всей программы. Пример 5.16 иллюстрирует основы использования этой функции.
Пример 5.16. PP4E\System\Exits\testexit_os.py
def outahere(): import os
print(‘Bye os world’) os._exit(99) print(‘Never reached’)
if __name__ == ‘__main__’: outahere()
В отличие от sys.exit, функция os._exit неуязвима для инструкций обработки исключений try/except и try/finally:
C:\...\PP4E\System\Exits> python testexit_os.py
Bye os world
C:\...\PP4E\System\Exits> python >>> from testexit_os import outahere >>> try:
... outahere()
... except:
... print('Ignored')
Bye os world # завершение процесса интерактивного сеанса
C:\...\PP4E\System\Exits> python >>> from testexit_os import outahere >>> try:
... outahere()
... finally:
... print('Cleanup')
Bye os world # ditto
Коды завершения команд оболочки
Обе функции завершения из модулей sys и os, с которыми мы только что познакомились, принимают аргумент, определяющий код завершения процесса (в функции из модуля sys он необязателен, но в функции из модуля os - необходим). После завершения программы этот код может запрашиваться оболочкой или программой, запустившей сценарий как дочерний процесс. В Linux, например, чтобы получить код завершения последней программы, запрашивается значение переменной оболочки status. По соглашению ненулевое значение указывает, что возникли какие-то проблемы:
[mark@linux]$ python testexit_sys.py
Bye sys world
[mark@linux]$ echo $status 42
[mark@linux]$ python testexit_os.py
Bye os world [mark@linux]$ echo $status
99
В последовательности команд попутная проверка кодов завершения может использоваться как простая форма связи между программами.
Можно также получить код завершения программы, запущенной другим сценарием. Например, как рассказывалось в главах 2 и 3, при запуске команд оболочки код завершения предоставляется как:
• Значение, возвращаемое функцией os.system
• Значение, возвращаемое методом close объекта os.popen (по историческим причинам для значения None возвращается код 0, что означает отсутствие ошибок)
• Значение, возвращаемое различными интерфейсами в модуле subprocess (например, возвращаемое значение функции call, значение атрибута returnvalue объекта Popen и возвращаемое значение метода
wait)
Кроме того, в случае, когда программа запускается приемом ветвления процессов, код завершения можно получить вызовом функций os.wait и os.waitpid в родительском процессе.
Получение кода завершения с помощью os.system и os.popen
Рассмотрим сначала случай с командами оболочки - в операционной системе Linux запускаются программы из примеров 5.15 и 5.16, производится чтение вывода этих сценариев через каналы и получение кодов завершения:
[mark@linux]$ python >>> import os
>>> pipe = os.popen('python testexit_sys.py')
>>> pipe.read()
‘Bye sys world\012’
>>> stat = pipe.close() # возвращает код завершения
>>> stat
10752
>>> hex(stat)
‘0x2a00’
>>> stat >> 8 # извлекает код завершения из битовой маски
42
>>> pipe = os.popen('python testexit_os.py')
>>> stat = pipe.close()
>>> stat, stat >> 8
(25344, 99)
В версии Cygwin Python под Windows этот пример действует точно так же. При использовании функции os.popen в Unix-подобных системах по причинам, которые мы не будем здесь рассматривать, код завершения помещается в определенные битовые позиции возвращаемого значения.
Код действительно находится там, но чтобы его увидеть, нужно сдвинуть результат вправо на восемь разрядов. Код завершения команд, выполняемых с помощью функции os.system, можно получить с помощью библиотечной функции:
>>> stat = os.system('python testexit_sys.py')
Bye sys world >>> stat, stat >> 8
(10752, 42)
>>> stat = os.system('python testexit_os.py')
Bye os world
>>> stat, stat >> 8
(25344, 99)
Все эти приемы действуют и в стандартной версии Python для Windows, однако в этой операционной системе код завершения уже не является битовой маской (проверяйте значение sys.platform, если ваша программа должна работать на обеих платформах):
C:\...\PP4E\System\Exits> python >>> os.system('python testexit_sys.py')
Bye sys world 42
>>> os.system('python testexit_os.py')
Bye os world 99
>>> pipe = os.popen('python testexit_sys.py')
>>> pipe.read()
‘Bye sys world\n’
>>> pipe.close()
42
>>>
>>> os.popen('python testexit_os.py').close()
99
Буферизация потока вывода: первый взгляд
Обратите внимание, что в последней проверке, в предыдущем фрагменте программного кода, не предпринимается попытка прочитать вывод команды. В подобных ситуациях может потребоваться запускать целевой сценарий в небуферизованном режиме, то есть запускать интерпретатор Python с флагом -u, или изменить сценарий, чтобы он выталкивал выходной буфер вручную с помощью функции sys.stdout.flush. В противном случае текст, выводимый в стандартный поток вывода, не будет вытолкнут из буфера стандартного потока вывода при вызове функции os._exit. По умолчанию при подключении канала, как в данном примере, стандартный поток вывода работает в режиме полной буферизации - при подключении к терминалу в буфер помещается только одна строка:
>>> pipe = os.popen('python testexit_os.py')
>>> pipe.read() # буферы не выталкиваются при выходе
>>> pipe = os.popen('python -u testexit_os.py') # принудительный
>>> pipe.read() # небуферизованный режим
‘Bye os world\n’
Странно, но, несмотря на то, что имеется возможность передавать функциям os.popen и subprocess.Popen аргумент, управляющий режимом и буферизацией, - в данном случае это не поможет. Аргументы передаются инструментам со стороны вызывающего процесса, с конца канала, работающего в порожденной программе, как поток ввода, а не как поток вывода:
>>> pipe = os.popen('python testexit_os.py', 'r', 1) # построчная буферизация >>> pipe.read() # но мой канал - это не поток вывода программы!
>>> from subprocess import Popen, PIPE
>>> pipe = Popen('python testexit_os.py', bufsize=1, stdout=PIPE) # для моего >>> pipe.stdout.read() # канала -
b’’ # не поможет
Аргументы, определяющие режим буферизации, воздействуют на поток вывода вызывающего процесса, через который он записывает данные в стандартный поток ввода команды, а не на поток вывода команды, откуда вызывающий процесс читает данные.
При необходимости запускаемый сценарий может сам, вручную выталкивать выходные буферы - периодически или перед принудительным завершением. Подробнее о буферизации мы поговорим, когда будем обсуждать возможность возникновения ситуации взаимоблокировки далее в этой главе, и еще раз - в главах 10 и 12, где мы узнаем, как все это увязывается с сокетами. Поскольку мы вспомнили про модуль subprocess, рассмотрим теперь предоставляемые им инструменты завершения программ.
Получение кода завершения с помощью модуля subprocess
Модуль subprocess позволяет получить код завершения различными способами, как было показано в главах 2 и 3 (значение None в атрибуте returncode указывает, что дочерний процесс еще не завершился):
C:\...\PP4E\System\Exits> python
>>> from subprocess import Popen, PIPE, call
>>> pipe = Popen('python testexit_sys.py', stdout=PIPE)
>>> pipe.stdout.read()
b’Bye sys world\r\n’
>>> pipe.wait()
42
>>> call('python testexit_sys.py')
Bye sys world 42
>>> pipe = Popen('python testexit_sys.py', stdout=PIPE)
>>> pipe.communicate()
(b’Bye sys world\r\n’, None)
>>> pipe.returncode
42
Модуль subprocess действует аналогично и на Unix-подобных платформах, таких как Cygwin, но в отличие от функции os.popen, код завершения не преобразуется в битовую маску, и поэтому он совпадает с результатом в Windows (обратите внимание, что при использовании в Cygwin и в Unix-подобных системах требуется установить аргумент shell=True, как мы узнали в главе 2, тогда как в Windows этот аргумент требуется установить только для запуска встроенных команд оболочки, таких как dir):
[C:\...\PP4E\System\Exits]$ python
>>> from subprocess import Popen, PIPE, call
>>> pipe = Popen('python testexit_sys.py', stdout=PIPE, shell=True)
>>> pipe.stdout.read()
b’Bye sys world\n’
>>> pipe.wait()
42
>>> call('python testexit_sys.py', shell=True)
Bye sys world 42
Код завершения процесса и совместно используемая информация
Теперь, чтобы узнать, как получить код завершения процесса, порожденного ветвлением, напишем простую программу, выполняющую ветвление: сценарий в примере 5.17 порождает дочерние процессы и выводит коды их завершения, возвращаемые функцией oswait, пока не будет нажата клавиша q”.
Пример 5.17. PP4E\System\Exits\testexit_fork.py
порождает дочерние процессы и получает коды их завершения вызовом функции os.wait; прием ветвления может использоваться в Unix и Cygwin, но он не работает в стандартной версии Python 3.1 для Windows;
примечание: порождаемые потоки выполнения совместно используют глобальные переменные, но каждый процесс имеет собственные копии этих переменных (однако при ветвлении процессов файловые дескрипторы используются совместно) --exitstat здесь всегда имеет одно и то же значение, но может отличаться в случае использования потоков;
import os
exitstat = 0
def child(): # здесь можно вызвать os.exit для завершения
global exitstat # изменит глобальную переменную этого процесса
exitstat += 1 # код завершения для функции wait родителя
print(‘Hello from child’, os.getpid(), exitstat) os._exit(exitstat) print(‘never reached’)
def parent(): while True:
newpid = os.fork() # запустить новую копию процесса if newpid == 0: # если это копия, вызвать функцию child
child() # ждать ввода ‘q’ с консоли
else:
pid, status = os.wait()
print(‘Parent got’, pid, status, (status >> 8)) if input() == ‘q’: break
if__name__== ‘__main__’: parent()
Если запустить эту программу в Linux, Unix или Cygwin (не забывайте, что функция fork не работает в стандартной версии Python для Windows, - по крайней мере, когда я работал над четвертым изданием этой книги), она выведет следующие результаты:
[C:\...\PP4E\System\Exits]$ python testexit_fork.py
Hello from child 5828 1
Parent got 5828 256 1
Hello from child 9540 1
Parent got 9540 256 1
Hello from child 3152 1
Parent got 3152 256 1
q
Если внимательно изучить этот вывод, можно заметить, что код завершения (последнее выводимое число) всегда одинаков - 1. Поскольку ответвленные процессы начинают жизнь как копии создавших их процессов, они также получают копию глобальной памяти. Поэтому каждый дочерний процесс получает и изменяет собственную глобальную переменную exitstat, не трогая экземпляров этой переменной в других процессах. В то же время дочерние процессы получают копии файловых дескрипторов, которые используются совместно с родительским процессом, и именно поэтому вывод от дочерних процессов попадает в то же самое место.
Код завершения потока и совместно используемая информация
В отличие от процессов, потоки выполняются параллельно внутри одного и того же процесса и совместно используют глобальную память. Все потоки в примере 5.18 изменяют одну и ту же глобальную переменную exitstat.
Пример 5.18. PP4E\System\Exits\testexit_thread.py
порождает потоки выполнения и следит за изменениями в глобальной памяти; обычно потоки завершаются при возврате из выполняемой в них функции, но поток может завершиться, вызвав функцию _thread.exit(); функция _thread.exit играет ту же роль, что и функция sys.exit, и возбуждает исключение SystemExit; потоки взаимодействуют через глобальные переменные, по мере надобности блокируемые; ВНИМАНИЕ: на некоторых платформах может потребоваться придать атомарность вызовам функций print/input -- из-за совместно используемых потоков ввода-вывода;
import _thread as thread exitstat = 0
def child():
global exitstat # используется глобальная переменная процесса,
exitstat += 1 # совместно используемая всеми потоками
threadid = thread.get_ident()
print(‘Hello from child’, threadid, exitstat)
thread.exit()
print(‘never reached’)
def parent(): while True:
thread.start_new_thread(child, ()) if input() == ‘q’: break
if __name__ == ‘__main__’: parent()
Ниже показано, как действует этот сценарий в Windows, - в отличие от ветвления процессов, потоки выполнения поддерживаются и в стандартной версии Python для Windows. Все потоки получают разные идентификаторы - они произвольные, но уникальные среди активных потоков, поэтому их можно использовать в качестве ключей словаря для сохранения информации о потоках (на некоторых платформах идентификаторы потоков могут повторно использоваться после их завершения):
C:\...\PP4E\System\Exits> python testexit_thread.py
Hello from child 4908 1
Hello from child 4860 2
Hello from child 2752 3
Hello from child 8964 4
q
Обратите внимание, что значение глобальной переменной exitstat в этом сценарии изменяется каждым потоком выполнения, - из-за того, что потоки совместно используют глобальную память процесса. Эта особенность часто используется для организации взаимодействий между потоками. Вместо того чтобы возвращать коды завершения, потоки могут присваивать значения глобальным переменным модуля или модифицировать изменяемые объекты, а для синхронизации доступа к совместно используемым элементам они могут использовать блокировки и очереди, если это необходимо. В данном сценарии также может возникнуть потребность в синхронизации потоков для изменения глобального счетчика, если он когда-либо будет использоваться для решения практических задач. Может потребоваться синхронизировать даже обращения к функциям print и input, если на используемой платформе потоки могут одновременно обращаться к потокам ввода-вывода. В этом простом демонстрационном сценарии мы отказались от использования блокировок, предположив, что потоки не будут обращаться к этим операциям одновременно.
Как мы уже знаем, работа потока завершается нормальным образом и без сообщений, когда происходит возврат из функции, запущенной потоком, и значение, возвращаемое функцией, игнорируется. Кроме того, может быть вызвана функция _thread.exit для завершения вызвавшего ее потока явно и тихо. Эта функция действует почти в точности как sys. exit (но не принимает аргумента с кодом завершения) и возбуждает исключение SystemExit в вызвавшем ее потоке. Поэтому поток можно также досрочно завершить, вызвав функцию sys.exit или непосредственно возбудив исключение SystemExit. Следите, однако, за тем, чтобы не вызвать внутри функции потока функцию os._exit, - это может привести к странным результатам (на моей системе Linux в результате подвешивался весь процесс, а в Windows уничтожались все потоки процесса!).
В альтернативном модуле threading реализация потоков не имеет метода, эквивалентного функции _thread.exit(), но, так как единственное действие, которое выполняет последний, - это возбуждение исключения SystemExit, применение этой операции при использовании модуля threading даст такой же эффект - поток немедленно и тихо завершит работу, как, например, в следующем фрагменте (этот программный код находится в файле testexit-threading.py в дереве примеров):
import threading, sys, time
def action():
sys.exit() # или возбуждение исключения SystemExit() print(‘not reached’)
threading.Thread(target=action).start()
time.sleep(2)
print(‘Main exit’)
Помните также, что потоки выполнения и процессы имеют собственные модели продолжительности жизни, которые мы исследовали выше. Напомним, что если дочерние потоки продолжают выполняться, то поведение, обеспечиваемое двумя модулями работы с потоками, будет различаться - на большинстве платформ программа завершится, если главный поток был создан с помощью инструментов модуля _thread, но не сможет завершиться, если использовался модуль threading и все дочерние потоки не были запущены, как потоки-демоны. В случае использования процессов является нормальным, когда дочерние процессы «переживают» своего родителя. Эту отличительную черту процессов легко объяснить, если помнить, что потоки выполнения - это всего лишь вызовы функций внутри процесса, а процессы - это более автономные и независимые единицы.
При правильном применении код завершения можно использовать для обнаружения ошибок и в простых протоколах обмена данными в системах, образуемых сценариями командной строки. Но при этом следует подчеркнуть, что в большинстве сценариев завершение совпадает с достижением конца исходного файла, а большинство функций потоков выполнения просто возвращают управление - явное завершение обычно предусматривается только для исключительных ситуаций и только в заданных контекстах. Взаимодействие между программами обычно обеспечивается более богатым набором инструментов, чем просто передача целочисленных кодов завершения, а каким - рассказывается в следующем разделе.
Взаимодействия между процессами
Как мы видели выше, когда сценарии порождают потоки выполнения - задачи, выполняемые параллельно внутри программы, - потоки могут естественным образом поддерживать связь друг с другом путем изменения и чтения переменных и объектов в совместно используемой глобальной памяти. Сюда относятся как доступные переменные и атрибуты, так и изменяемые объекты. Мы видели также, что следует позаботиться об использовании блокировок для синхронизации доступа к совместно используемым объектам, если есть вероятность одновременного их изменения из разных потоков. Потоки выполнения предлагают достаточно простую модель взаимодействий, и модуль queue во многих ситуациях реализует синхронизацию практически автоматически.
Все становится намного сложнее, когда сценарии запускают дочерние процессы и программы, вообще не имеющие совместно используемой памяти. Если определить виды взаимодействий, которые могут осуществляться между программами, то окажется, что большинство вариантов мы уже рассмотрели в этой и в предыдущих главах. Например, ниже перечислены простые механизмы, которые могут рассматриваться, как инструменты взаимодействий между программами:
• Простые файлы
• Аргументы командной строки
• Коды завершения программ
• Переадресация стандартных потоков ввода-вывода
• Каналы, создаваемые с помощью функции os.рореn и модуля subprocess
Например, передача параметров в командной строке и запись в потоки ввода позволяет передавать параметры выполнения программ; чтение потоков вывода и кодов завершения дает возможность получать результаты. Поскольку порождаемыми программами наследуются значения переменных окружения, их также можно рассматривать, как один из способов передачи контекста. Каналы, создаваемые при помощи функции os.рореn или модуля subprocess, позволяют организовать еще более динамические взаимодействия: данные могут передаваться между программами в произвольные моменты времени, не только во время запуска или завершения.
Помимо этих механизмов в библиотеке Python есть и другие средства организации взаимодействий между процессами (Inter-Process Communication, IPC). К ним относятся сокеты, разделяемая память, сигналы, анонимные и именованные каналы и другие. Некоторые из них являются более переносимыми, некоторые менее переносимыми, и все они различаются по сложности и сфере использования. Например:
• Сигналы позволяют программам передавать простые уведомления другим программам.
• Анонимные каналы позволяют обмениваться данными потокам выполнения и родственным процессам, совместно использующим файловые дескрипторы, но этот механизм опирается на модель ветвления процессов в Unix-подобных системах, которая не является переносимой.
• Именованные каналы, отображаются в файловую систему - они позволяют обмениваться данными полностью независимым программам, но они доступны в Python не на всех платформах.
• Сокеты, отображаются на общесистемный набор номеров сетевых портов - они точно так же позволяют организовать обмен данными между произвольными программами, действующими на одном компьютере, но при этом предоставляют возможность взаимодействий по сети с программами, выполняющимися на удаленном компьютере, и к тому же представляют собой наиболее переносимый вариант.
Хотя некоторые из них могут использоваться, как механизмы взаимодействий между потоками выполнения, но истинная их мощь становится видна, когда они используются для организации взаимодействий между отдельными процессами, вообще не имеющими общей памяти.
В данном разделе мы познакомимся с каналами (анонимными и именованными), а также с сигналами. Помимо этого здесь мы впервые встретимся с сокетами, но только в виде предварительного знакомства. Сокеты могут использоваться для организации взаимодействий процессов, выполняющихся на одном компьютере, но так как основное их назначение заключается в работе с сетями, большую часть подробностей мы оставим до части книги, где будет рассказываться о разработке приложения для Интернета.
Программисты на языке Python могут пользоваться и другими механизмами IPC (например, разделяемой памятью, доступ к которой предоставляется модулем mmap), о которых здесь не рассказывается из-за недостатка места. Если вас интересует что-то более специальное, ищите в руководствах Python и на веб-сайте подробности использования других схем IPC.
По окончании этого раздела мы также исследуем пакет multiprocessing, который предлагает дополнительные и переносимые механизмы IPC, являющиеся частью его универсального API запуска процессов, включая разделяемую память, а также каналы и очереди для передачи произвольных объектов Python в сериализованном виде. Но сначала познакомимся с более традиционными подходами.
Анонимные каналы
Каналы, как механизм взаимодействия программ, реализуются операционной системой, а стандартная библиотека Python лишь обеспечивает доступ к ним. Каналы - это однонаправленные потоки ввода-вывода, по своему действию напоминающие буфер в совместно используемой памяти, интерфейс которого с обеих сторон похож на простой файл. В самом типичном случае использования одна программа пишет данные с одного конца канала, а вторая читает их с другого конца. Каждая из программ видит только свой конец канала и обрабатывает его с помощью обычных функций для работы с файлами.
Большую часть работы с каналами проделывает операционная система. Например, функции для чтения данных из канала обычно блокируют вызывающую программу, пока данные не станут доступны (то есть будут отправлены программой на другом конце), вместо того чтобы возвращать признак конца файла. Кроме того, функция чтения из канала всегда возвращает самые «старые» данные, записанные в канал, то есть каналы реализуют модель «первым пришел, первым ушел» - данные, которые были записаны раньше, будут прочитаны в первую очередь. Такие особенности позволяют использовать каналы для синхронизации выполнения независимых программ.
Каналы бывают двух видов - анонимные и именованные. Именованные каналы (иногда их называют «fifo») представляются в компьютере в виде файла. Так как именованные каналы фактически являются внешними файлами, взаимодействующие процессы вообще могут быть не связаны родственными узами - они могут быть совершенно независимыми программами.
Анонимные каналы, напротив, существуют только внутри процессов и обычно используются вместе с приемом ветвления процессов, как средство связи родительского и порожденного дочернего процессов в приложении: родитель и потомок общаются через совместно используемые дескрипторы файлов каналов. Потоки выполнения действуют в одном и том же процессе и совместно используют всю глобальную память, поэтому анонимные каналы могут использоваться и для взаимодействий между ними.
Основы анонимных каналов
Поскольку анонимные каналы являются наиболее традиционным инструментом, мы познакомимся с ними в первую очередь. Сценарий в примере 5.19 создает копию вызывающего процесса с помощью функции os.fork (с ветвлением процессов мы познакомились выше в этой главе). После ветвления исходный родительский процесс и его дочерняя копия общаются между собой через канал, созданный функцией os.pipe перед ветвлением. Функция os.pipe возвращает кортеж с двумя дескрипторами файлов - низкоуровневыми идентификаторами файлов, с которыми мы познакомились в главе 4, представляющими входной и выходной концы канала. Так как ответвленный дочерний процесс получает копии дескрипторов файлов своего родителя, то при записи в дескриптор выходного конца канала в дочернем процессе данные посылаются обратно родителю по каналу, созданному до создания дочернего процесса.
Пример 5.19. PP4E\System\Processes\pipe1.py
import os, time
def child(pipeout): zzz = 0 while True:
time.sleep(zzz) # заставить родителя подождать
msg = (‘Spam %03d’ % zzz).encode() # каналы - двоичные файлы os.write(pipeout, msg) # отправить данные родителю
zzz = (zzz+1) % 5 # переход к 0 после 4
def parent():
pipein, pipeout = os.pipe() # создать канал с 2 концами
if os.fork() == 0: # создать копию процесса
child(pipeout) # в копии вызвать child
else: # в родителе слушать канал
while True:
line = os.read(pipein, 32) # остановиться до получения данных
print(‘Parent %d got [%s] at %s’ % (os.getpid(), line,
time.time()))
parent()
Если запустить эту программу в Linux, Cygwin или в другой Unix-подобной системе (функция pipe имеется в стандартной реализации Python для Windows, а вот функция fork - нет), то родительский процесс при каждом вызове os.read будет ждать, пока дочерний процесс отправит данные в канал. Здесь дочерний и родительский процессы действуют почти как клиент и сервер - родитель запускает дочерний процесс и ждет от него инициации обмена.24 Для имитации длительных операций дочерний процесс заставляет родителя ждать каждое следующее сообщение на одну секунду дольше предыдущего с помощью вызова функции time.sleep, пока задержка не достигнет четырех секунд. Когда счетчик задержки zzz становится равным 005, он сбрасывается обратно в 000, и отсчет начинается сначала:
[C:\..
\PP4E\System\Processes]$ python pipe1.py
Parent
6716
got
[b
’Spam
000
’] at
1267996104.53
Parent
6716
got
[b
’Spam
001
’] at
1267996105.54
Parent
6716
got
[b
’Spam
002
’] at
1267996107.55
Parent
6716
got
[b
’Spam
003
’] at
1267996110.56
Parent
6716
got
[b
’Spam
004
’] at
1267996114.57
Parent
6716
got
[b
’Spam
000
’] at
1267996114.57
Parent
6716
got
[b
’Spam
001
’] at
1267996115.59
Parent
6716
got
[b
’Spam
002
’] at
1267996117.6
Parent
6716
got
[b
’Spam
003
’] at
1267996120.61
Parent
6716
got
[b
’Spam
004
’] at
1267996124.62
Parent
6716
got
[b
’Spam
000
’] at
1267996124.62
Parent
6716
got
[b
’Spam
001
’] at
1267996125.63
...и так далее: Ctrl-C для выхода...
Обратите внимание, что родитель принимает из канала строку байтов. Данные через простые каналы обычно передаются в виде строк байтов, если они обслуживаются с применением инструментов для работы с дескрипторами файлов, с которыми мы встречались в главе 4 (как мы видели там, инструменты чтения из дескрипторов и записи в дескрипторы, имеющиеся в модуле os, всегда возвращают и принимают строки байтов). Именно поэтому мы вынуждены в дочернем процессе вручную кодировать текст в строку байтов перед записью в канал - операция форматирования строк не может применяться к строкам байтов. Как будет показано в следующем разделе, дескриптор канала можно обернуть объектом текстового файла, как мы делали это в примерах главы 4, но этот прием обеспечит лишь автоматическое кодирование и декодирование при передаче данных средствами объекта, тогда как внутри канала данные все равно будут передаваться в форме строк байтов.
Обертывание дескрипторов канала объектами файлов
При внимательном рассмотрении вывода предыдущего сценария можно заметить, что когда счетчик задержки в дочернем процессе достигает значения 004, родительский процесс получает из канала сразу два сообщения - дочерний процесс записывает два различных сообщения, но на некоторых платформах или при определенных настройках (отличных от тех, что используются здесь) они могут оказаться достаточно близки по времени и получены родителем как один блок данных. В действительности родитель каждый раз слепо запрашивает чтение не более 32 байтов, но получает тот текст, который есть в канале.
Чтобы отделять одно сообщение от другого, можно определить для канала символ-разделитель. Для этого можно использовать символ конца строки, так как можно обернуть дескриптор канала объектом файла с помощью функции os.fdopen и использовать его метод readline для поиска в канале очередного разделителя \n. Кроме того, этот прием позволит использовать более мощные инструменты объектов текстовых файлов, с которыми мы познакомились в главе 4. Такая схема реализована в примере 5.20.
Пример 5.20. PP4E\System\Processes\pipe2.py
# аналогичен сценарию pipe1.py, но обертывает входной дескриптор канала
# объектом файла для обеспечения построчного чтения данных,
# и в обоих процессах закрывает неиспользуемый дескриптор канала
import os, time
def child(pipeout): zzz = 0 while True:
time.sleep(zzz) # заставить родителя подождать
msg = (‘Spam %03d\n’ % zzz).encode() # каналы - двоичные файлы в 3.X
os.write(pipeout, msg) # отправить данные родителю
zzz = (zzz+1) % 5 # переход к 0 через 5 итераций
def parent():
pipein, pipeout = os.pipe() # создать канал с 2 концами
if os.fork() == 0: # дочерний процесс пишет в канал
os.close(pipein) # закрыть дескриптор ввода
child(pipeout)
else: # в родителе слушать канал
os.close(pipeout) # закрыть дескриптор вывода
pipein = os.fdopen(pipein) # создать объект текстового файла
while True:
line = pipein.readline()[:-1] # остановиться до получения данных print(‘Parent %d got [%s] at %s’ % (os.getpid(), line,
time.time()))
parent()
Эта версия расширена тем, что закрывает неиспользуемые концы каналов в каждом процессе (например, после ветвления родительский процесс закрывает свою копию выходного конца канала, в который пишет дочерний процесс); обычно программы должны закрывать неиспользуемые концы каналов. В этой новой версии родителю при каждом чтении из канала гарантированно возвращается одно сообщение дочернего процесса, потому что при записи все они разделяются маркерами:
...и так далее: Ctrl-C для выхода...
Обратите внимание, что в этой версии текстовые данные теперь возвращаются в виде объекта str, так как функция os.fdopen по умолчанию устанавливает режим r при открытии файла. Как уже упоминалось, обмен данными через каналы обычно происходит с использованием строк байтов, когда дескрипторы используются непосредственно, с применением инструментов из модуля os, но обертывание дескрипторов объектами файлов позволяет использовать для представления данных строки str. В этом примере декодирование байтов в строку str в родительском процессе выполняется операцией чтения. Использование функции os.fdopen и текстового режима в дочернем процессе позволило бы избежать необходимости кодирования данных вручную, но это кодирование в любом случае выполнялось бы объектом файла (хотя кодирование символов ASCII, как в данном примере, является достаточно тривиальной операцией). Что касается простых файлов, лучший режим обработки данных в канале определяется самой их природой.
Анонимные каналы и потоки выполнения
Функция os.fork, используемая в примерах из предыдущего раздела, недоступна в стандартной версии Python для Windows, но функция os.pipe доступна. Так как все потоки выполнения работают в рамках одного процесса и совместно используют дескрипторы файлов (и всю глобальную память), это позволяет использовать анонимные каналы для синхронизации потоков выполнения. Это, возможно, более низкоуровневый механизм, чем очереди или общие объекты, и тем не менее он обеспечивает дополнительное средство организации взаимодействий между потоками выполнения. Так, в примере 5.21 демонстрируется тот же способ обмена данными с помощью канала, но уже между потоками, а не между процессами.
Пример 5.21. PP4E\System\Processes\pipe-thread.py
# анонимные каналы и потоки выполнения вместо процессов;
# эта версия работает и в Windows
import os, time, threading
def child(pipeout): zzz = 0 while True:
time.sleep(zzz) # заставить родителя подождать
msg = (‘Spam %03d’ % zzz).encode() # каналы - двоичные файлы os.write(pipeout, msg) # отправить данные родителю
zzz = (zzz+1) % 5 # переход к 0 после 4
def parent(pipein): while True:
line = os.read(pipein, 32) # остановиться до получения данных
print(‘Parent %d got [%s] at %s’ % (os.getpid(), line, time.time()))
pipein, pipeout = os.pipe()
threading.Thread(target=child, args=(pipeout,)).start()
parent(pipein)
Так как стандартная версия Python для Windows поддерживает потоки выполнения, данный сценарий будет работать и в Windows. Вывод сценария похож на предыдущий, но взаимодействующими сторонами здесь являются потоки выполнения, а не процессы (обратите внимание, что из-за бесконечных циклов по крайней мере один из потоков выполнения может не завершиться после нажатия комбинации Ctrl-C -чтобы остановить процесс python.exe, выполняющий этот сценарий, в Windows может потребоваться вызвать Диспетчер задач (Task Manager) или закрыть окно консоли):
C:\...\PP4E\System\Processes> pipe-thread.py Parent 8876 got [b’Spam 000’] at 1268579215.71 Parent 8876 got [b’Spam 001’] at 1268579216.73 Parent 8876 got [b’Spam 002’] at 1268579218.74 Parent 8876 got [b’Spam 003’] at 1268579221.75 Parent 8876 got [b’Spam 004’] at 1268579225.76 Parent 8876 got [b’Spam 000’] at 1268579225.76 Parent 8876 got [b’Spam 001’] at 1268579226.77 Parent 8876 got [b’Spam 002’] at 1268579228.79
...и так далее: Ctrl-C или Диспетчер задач для выхода...
Двунаправленный обмен данными с помощью анонимных каналов
Обычно каналы позволяют данным перемещаться только в одном направлении - один конец является входом, другой выходом. А как быть, если потребуется организовать общение между программами в обоих направлениях? Например, одна программа может посылать другой запрос на информацию и ждать получения этой информации. Один канал не может справиться с такими двунаправленными переговорами, но это можно реализовать с помощью двух каналов: один канал используется для передачи запроса, а второй - для пересылки ответа запросившей программе.
Так происходит во множестве практических применений. Например, однажды я написал графический интерфейс для отладчика командной строки C-подобного языка программирования и связал два процесса каналами, используя описываемый прием. Графический интерфейс запускался как отдельный процесс, который конструировал и отправлял команды отладчику командной строки в его поток ввода через канал, а затем анализировал результаты, возвращаемые отладчиком через его поток вывода. Графический интерфейс выступал в роли программиста, вводящего команды с клавиатуры, и как клиент отладчика-сервера. Вообще говоря, возможность запускать программы командной строки как дочерние процессы, с потоками ввода-вывода, подключенными к каналам, позволяет добавлять новые интерфейсы к старым программам. Простой пример реализации графического интерфейса подобного рода мы увидим в главе 10.
Модуль в примере 5.22 демонстрирует один из способов реализации идеи связывания стандартных потоков ввода и вывода двух программ. В нем функция spawn запускает новую дочернюю программу и соединяет потоки ввода и вывода родительской программы с потоками ввода и вывода дочерней программы. Это означает, что:
• Когда родитель читает из своего стандартного потока ввода, происходит чтение текста, отправленного дочерней программой в свой стандартный поток вывода.
• Когда родитель записывает в свой стандартный поток вывода, происходит отправка данных в стандартный поток ввода дочерней программы.
В итоге две независимые программы обмениваются между собой данными через свои стандартные потоки ввода-вывода.
Пример 5.22. PP4E\System\Processes\pipes.py
запускает дочерний процесс/программу, соединяет свои потоки stdin/stdout с потоками stdout/stdin дочернего процесса -- операции чтения и записи на стороне родительского процесса отображаются на стандартные потоки ввода-вывода дочерней программы; напоминает соединение потоков с помощью модуля subprocess;
import os, sys
def spawn(prog, *args): # имя программы, аргументы командной строки
stdinFd = sys.stdin.fileno() # получить дескрипторы потоков
stdoutFd = sys.stdout.fileno() # обычно stdin=0, stdout=1
parentStdin, childStdout = os.pipe() # создать два канала IPC childStdin, parentStdout = os.pipe() # pipe возвращает (inputfd, outoutfd) pid = os.fork() # создать копию процесса
if pid:
os.close(childStdout) # в родительском после ветвления:
os.close(childStdin) # закрыть дочерние концы в родителе
os.dup2(parentStdin, stdinFd) # копия sys.stdin = pipe1[0] os.dup2(parentStdout, stdoutFd) # копия sys.stdout = pipe2[1] else:
os.close(parentStdin) # в дочернем после ветвления:
os.close(parentStdout) # закрыть родительские концы
os.dup2(childStdin, stdinFd) # копия sys.stdin = pipe2[0]
os.dup2(childStdout, stdoutFd) # копия sys.stdout = pipe1[1]
args = (prog,) + args
os.execvp(prog, args) # запустить новую программу
assert False, ‘execvp failed!’ # os.exec никогда не вернется сюда
if__name__== ‘__main__’:
mypid = os.getpid()
spawn(‘python’, ‘pipes-testchild.py’, ‘spam’) # породить дочернюю прогр.
print(‘Hello 1 from parent’, mypid) # в stdin дочерней прогр.
sys.stdout.flush() # вытолкнуть буфер stdio
reply = input() # из потока вывода потомка
sys.stderr.write(‘Parent got: “%s”\n’ % reply) # stderr не связан
# с каналом!
print(‘Hello 2 from parent’, mypid)
sys.stdout.flush()
reply = sys.stdin.readline()
sys.stderr.write(‘Parent got: “%s”\n’ % reply[:-1])
Функция spawn в этом модуле не работает под управлением стандартной версии Python для Windows (не забывайте, что функции fork в этой системе пока нет). В действительности большинство функций, используемых в этом модуле, отображаются непосредственно в системные вызовы Unix (и могут ужаснуть разработчиков, которые не пишут для Unix!). С некоторыми из этих функций мы уже встречались (например, os.fork), но значительная часть этого программного кода основывается на концепциях Unix, разобраться с которыми должным образом в данной книге нам не позволит время. Тем не менее ниже приводится упрощенное описание системных вызовов, использованных в этом примере:
os.fork
Создает копию вызывающего процесса и возвращает числовой идентификатор ID дочернего процесса только родительскому процессу.
os.execvp
Затирает вызывающий процесс новой программой. Эта функция очень похожа на использовавшуюся выше функцию os.execlp, но принимает кортеж или список аргументов командной строки (в аргументе *args в заголовке функции).
os.pipe
Возвращает кортеж дескрипторов файлов, представляющих входной и выходной концы канала, как показано в приведенных ранее примерах.
os.close(fd)
Закрывает файл с дескриптором fd.
os.dup2(fd1, fd2)
Копирует всю системную информацию, связанную с файлом, заданным дескриптором fd1, в файл, заданный дескриптором fd2.
Что касается стандартных потоков ввода-вывода, самое важное место здесь занимает функция os.dup2. Например, вызов os.dup2(parentStdin, stdinFd) по сути присваивает дескриптор файла stdin родительского процесса входному концу одного из создаваемых каналов - все операции чтения из потока stdin с этого момента будут извлекать данные из канала. После соединения другого конца этого канала с копией файла потока stdout дочернего процесса посредством os.dup2(childStdout, stdoutFd) текст, выводимый дочерним процессом в его поток sdtdout, будет отправляться через канал в поток stdin родителя. По своему эффекту этот прием напоминает способ, которым мы соединяли потоки ввода-вывода с помощью модуля subprocess в главе 3, но этот сценарий менее переносим и действует на более низком уровне.
Для проверки этой утилиты в конце файла помещен программный код самотестирования, который запускает в дочернем процессе программу, приведенную в примере 5.23, и производит операции чтения и записи в стандартные потоки ввода-вывода, осуществляя обмен данными через два канала.
Пример 5.23. PP4E\System\Processes\pipes-testchild.py
import os, time, sys mypid = os.getpid() parentpid = os.getppid()
sys.stderr.write(‘Child %d of %d got arg: “%s”\n’ %
(mypid, parentpid, sys.argv[1]))
for i in range(2):
time.sleep(3) # приостановить родительский процесс
recv = input() # stdin связан с каналом: данные будут поступать из
# родительского потока вывода stdout
time.sleep(3)
send = ‘Child %d got: [%s]’ % (mypid, recv)
print(send) # stdout связан с каналом: данные будут поступать в
# родительский поток ввода stdin
sys.stdout.flush() # гарантировать отправку, иначе процесс заблокируется
Ниже приводятся результаты тестирования в Cygwin (напоминает Unix-подобные системы, такие как Linux). Вывод не производит большого впечатления, но показывает, как две программы выполняются независимо и обмениваются данными через каналы, управляемые операционной системой. Этот пример еще более напоминает модель клиент/сервер (если представить себе дочерний процесс как сервер, отвечающий на запросы родителя). Текст, заключенный в квадратные скобки, попал из родительского процесса в дочерний и вернулся обратно в родительский, и все это через каналы, подключенные к стандартным потокам ввода-вывода:
[C:\...\PP4E\System\Processes]$ python pipes.py Child 9228 of 9096 got arg: “spam”
Parent got: “Child 9228 got: [Hello 1 from parent 9096]”
Parent got: “Child 9228 got: [Hello 2 from parent 9096]”
Еще раз о буферизации потока вывода: взаимоблокировки и выталкивание буферов
Два процесса из примера в предыдущем разделе ведут простой диалог, но этого вполне достаточно, чтобы проиллюстрировать некоторые опасности, таящиеся в процедурах обмена данными между программами. Во-первых, отметим, что обе программы должны выводить сообщения в поток stderr - их потоки stdout подключены к потокам ввода другой программы. Поскольку процессы используют общие дескрипторы файлов, получается, что в родительском и в дочернем процессе stderr - это один и тот же поток, поэтому сообщения будут выводиться в одно и то же место.
Более тонкая особенность состоит в том, что и родительский, и дочерний процессы после вывода текста в поток stdout вызывают функцию sys.stdout.flush. Запрос ввода из канала обычно блокирует вызывающий процесс, если в канале нет данных, но в нашем примере из-за этого не должно возникать проблем, потому что запись производится столько же раз, сколько чтение на другом конце канала. Однако по умолчанию поток sys.stdout буферизуется, поэтому выведенный текст в действительности может оказаться переданным только через некоторое время (когда до конца будут заполнены буферы вывода). На практике, если принудительно не выталкивать содержимое буфера, оба процесса могут зависнуть в ожидании данных друг от друга - входных данных, находящихся в буфере и не сбрасываемых в канал. Это приводит к состоянию взаимоблокировки (deadlock), когда оба процесса блокируются в вызове функции input и ожидают события, которое никогда не произойдет.
С технической точки зрения, для потока вывода stdout по умолчанию используется режим построчной буферизации, когда он подключен к терминалу, а когда он подключается к другим устройствам, таким как файлы, сокеты или каналы, для него используется режим полной буферизации. Это объясняет, почему текст при выводе в окно консоли появляется на экране немедленно, а не когда процесс завершит работу или когда буфер вывода окажется заполнен, как в случаях, когда поток вывода подключен к какому-то другому устройству.
Буферизация выходных данных в действительности производится системными библиотеками, используемыми для доступа к каналам, а не самими каналами (каналы помещают выходные данные в очередь, но не скрывают их от чтения!). На самом деле в данном примере буферизация выполняется только потому, что мы передаем информацию для канала через sys.stdout - встроенный объект файла, по умолчанию выполняющий буферизацию. Однако такие аномалии могут происходить и при использовании других инструментов взаимодействия процессов.
В целом, когда программы ведут такого рода двусторонний диалог, избежать взаимоблокировки, связанной с буферизацией, можно несколькими способами:
• Выталкивание буферов: Как показано в примерах 5.22 и 5.23, выталкивание выходных буферов потоков вывода в канал с помощью метода flush объекта файла является простым способом принудительной очистки буферов. Для выталкивания выходного буфера потока вывода, используемого функцией print, используйте метод sys. stdout.flush.
• Аргументы: Как говорилось выше в этой главе, если вызывать интерпретатор Python с ключом -u командной строки, он отключит полную буферизацию потока вывода sys.stdout в выполняемых им программах. Запись любого непустого значения в переменную окружения PYTHONUNBUFFERED эквивалентна передаче этого ключа в команду запуска всех программ.
• Режимы открытия: Имеется также возможность использовать каналы в небуферизованном режиме. Для этого можно использовать низкоуровневые функции из модуля os для чтения и записи в дескрипторы канала или передавать в аргументе функции os.fdopen, определяющем размер буфера, значение 0 (небуферизованный режим) или 1 (режим построчной буферизации), чтобы отключить буферизацию в объекте файла, обертывающем дескриптор. Для управления режимом буферизации вывода в файлы, fifo (описываются в следующем разделе) можно также использовать аргументы функции open. Обратите внимание, что в Python 3.X полностью небуферизованный режим возможен только для двоичных файлов и невозможен для текстовых.
• Каналы, команд: Как упоминалось выше в этой главе, точно так же можно определять аргументы, управляющие буферизацией, для каналов командной строки, когда они создаются функциями os.popen и subprocess.Popen, но они воздействуют на конец канала в вызывающем процессе, и не влияют на режим буферизации в порожденных программах. Следовательно, этот прием не в состоянии предотвратить задержку вывода из последних, но может использоваться для передачи текстовых данных в каналы ввода других программ.
• Сокеты: Как мы увидим далее, функция socket.makefile принимает похожий аргумент, определяющий режим буферизации для сокетов (описываются далее в этой главе и книге), но в Python 3.X требует обязательную буферизацию для текстовых данных и, похоже, не поддерживает построчный режим буферизации (подробнее об этом в главе 12).
• Инструменты: Для решения более сложных задач можно также использовать высокоуровневые инструменты, которые фактически обманывают программу, заставляя ее полагать, что она подключена к терминалу. Эти инструменты предназначены для работы с программами не на языке Python, в которых невозможно организовать выталкивание буферов вручную или использовать ключ -u. Дополнительные подробности приводятся во врезке «Подробнее о буферизации потоков ввода-вывода: pty и Pexpect» ниже.
Использование дополнительных потоков выполнения позволяет избежать блокирования главного потока, управляющего графическим интерфейсом, но в действительности это решение лишь переносит проблему из одного места в другое (дочерний поток точно так же может оказаться заблокированным). Из предложенных решений, перечисленных выше, первые два - выталкивание буферов вручную и аргументы командной строки - часто являются наиболее простыми. Фактически, благодаря удобству в использовании, второй из перечисленных выше приемов заслуживает, чтобы сказать о нем несколько слов. Попробуйте следующее: закомментируйте все вызовы метода sys.stdout.flush в примерах 5.22 и 5.23 (в файлах pipes.py и pipes-testchild.py) и измените вызов функции, порождающий дочерний процесс в файле pipes.py, как показано ниже (то есть добавьте ключ -u командной строки):
spawn(‘python’, ‘-u’, ‘pipes-testchild.py’, ‘spam’)
После этого запустите программу с помощью командной строки python -u pipes.py. Работа будет происходить так же, как при выталкивании выходного буфера потока вывода stdout вручную, потому что теперь поток вывода stdout будет действовать в небуферизованном режиме.
Мы еще будем рассматривать эффекты, связанные с отсутствием буферизации потоков вывода, в главе 10, где напишем простой графический интерфейс, отображающий вывод программы командной строки, который будет приниматься через неблокирующий сокет и через канал в потоке выполнения. Еще раз, более подробно мы исследуем эту тему в главе 12, где будем использовать более универсальные способы перенаправления стандартных потоков ввода-вывода в сокеты. В целом, однако, взаимоблокировка представляет собой более обширную проблему, для полного исследования которой здесь недостаточно места. С другой стороны, если у вас достаточно знаний, чтобы пытаться использовать механизмы IPC в языке Python, то, наверное, вы уже ветеран войн со взаимоблокировками.
Анонимные каналы обеспечивают возможность общения процессов, связанных родственными узами, но они не подходят для программ, запускаемых независимо друг от друга. Чтобы обеспечить общение между такими программами, необходимо перейти к следующему разделу и исследовать механизмы, обладающие более широкой областью видимости.
Подробнее о буферизации потоков ввода-вывода: pty и Pexpect
В Unix-подобных системах для принудительного перевода потока стандартного вывода других программ в небуферизованный режим можно также использовать модуль pty из стандартной библиотеки Python, что особенно удобно, если эти другие программы написаны не на языке Python и вы не имеете возможности изменить их программный код.
Технически режим буферизации потока вывода stdout в других программах определяется за пределами интерпретатора Python, посредством проверки - подключен ли дескриптор потока вывода к терминалу. Эта проверка выполняется в стандартной библиотеке ввода-вывода файловой системы, и ее результаты не могут контролироваться порождаемыми программами.
В целом, когда стандартный поток вывода подключен к терминалу, для него используется режим построчной буферизации, а при подключении к другим устройствам (включая файлы, каналы и сокеты) используется режим полной буферизации. Такая политика применяется с целью повышения эффективности. Файлы и потоки ввода-вывода, создаваемые внутри сценариев на языке Python, следуют тем же правилам, но в них вы можете явно указать политику буферизации с помощью инструментов создания файлов.
Модуль pty фактически обманывает порожденную программу, заставляя ее думать, что она подключена к терминалу, благодаря чему в буфере потока вывода stdout сохраняется только одна строка. В результате появление в потоке вывода каждого нового символа перевода строки приводит к выталкиванию предыдущей строки из буфера, что типично для интерактивных программ и именно то, что нужно, если вы предполагаете получать строки по мере их воспроизведения.
Однако обратите внимание, что модуль pty не требуется применять для изменения режима буферизации потоков ввода-вывода при запуске сценариев на языке Python: просто используйте ключ -u командной строки, передавайте инструментам создания файлов аргументы, определяющие режим построчной буферизации, или вручную вызывайте метод sys.stdout.flush() в порождаемых программах. Кроме того, модуль pty на сегодняшний день доступен в Python не на всех платформах (он имеется в версии Python для Cygwin, но отсутствует в стандартной версии для Windows).
Пакет Pexpect, эквивалент программы expect для Unix на языке Python, использует модуль pty для обеспечения дополнительной функциональности и взаимодействий в обход стандартных потоков ввода-вывода (например, для ввода пароля). Более подробную информацию о модуле pty вы можете найти в руководстве по библиотеке Python, а также попробуйте поискать информацию о пакете Pexpect в Интернете.
Именованные каналы (fifo)
На некоторых платформах имеется возможность создавать каналы, существующие в виде настоящих файлов в файловой системе. Такие файлы называются именованными каналами (named pipes), или «fifo», так как они ведут себя в точности как каналы, которые создавались в программах из предыдущего раздела. Однако, вследствие того, что именованные каналы связаны с настоящими файлами, располагающимися на компьютере и являющимися внешними для любой программы, они никак не связаны с памятью, совместно используемой заданиями, и мо-
гут использоваться, как механизм взаимодействий между потоками, процессами и программами, запускаемыми независимо друг от друга.
После создания файла именованного канала процессы открывают его по имени и осуществляют чтение и запись в него с использованием обычных файловых операций. Fifo являются однонаправленными потоками. В типичной ситуации серверная программа читает данные из fifo, а одна или более клиентских программ записывают в него данные. Кроме того, для реализации двусторонней связи можно использовать группу из двух fifo, точно так же, как это делалось в предыдущем разделе с использованием анонимных каналов.
Так как именованные каналы являются файлами, они живут дольше, чем анонимные каналы внутри процессов, и к ним могут обращаться программы, запускаемые независимо. Приводившиеся выше примеры использования неименованных каналов основывались на том факте, что дескрипторы файлов (в том числе каналов) копируются в память дочерних процессов. Это осложняет использование анонимных каналов для организации взаимодействий программ, запускаемых независимо. С помощью же fifo доступ к каналам производится по имени файла, которое видят все программы независимо от наличия отношений родитель-потомок между процессами. Фактически, подобно обычным файлам, fifo обычно живут дольше программ, использующих их. Однако, в отличие от обычных файлов, операционная система синхронизирует доступ к fifo, что делает их идеальным механизмом IPC.
Благодаря этим отличиям именованные каналы лучше подходят в качестве универсального механизма IPC для независимых программ, взаимодействующих по схеме клиент/сервер. Например, постоянно выполняющаяся программа сервера может создавать каналы fifo и ждать из них запросы, поступающие от произвольных клиентов, а не только от тех, что могли бы быть порождены сервером. В некотором смысле, именованные каналы составляют альтернативу сокетам, с которыми мы встретимся в следующем разделе. Однако, в отличие от сокетов, при использовании каналов fifo нет прямой возможности устанавливать сетевые соединения с удаленными компьютерами, они не поддерживаются в версии Python для Windows на сегодняшний день, и для доступа к ним используется стандартный интерфейс для работы с файлами вместо более уникальных номеров портов и функций, которые мы будем изучать далее.
Основы именованных каналов
В Python файлы именованных каналов создаются с помощью функции os.mkfifo, которая доступна в настоящее время только в Unix-подобных системах и в версии Python для Cygwin в Windows, но недоступна в стандартной версии Python для Windows. Эта функция просто создает внешний файл - для отправки и получения данных через fifo его нужно открывать и обрабатывать, как стандартный файл.
Для иллюстрации в примере 5.24 приводится измененная версия сценария pipe2.py из примера 5.20, в которой вместо анонимных каналов используются именованные каналы. Как и сценарий pipe2.py, эта версия открывает в дочернем процессе канал fifo с помощью функции os.open в режиме двоичного доступа, а в родительском процессе - с помощью встроенной функции open, в текстовом режиме. Вообще говоря, на любом конце канала можно использовать любой из предложенных приемов, чтобы интерпретировать данные в канале как двоичные данные или как текст.
Пример 5.24. PP4E\System\Processes\pipefifo.py
именованные каналы; функция os.mkfifo недоступна в Windows (без Cygwin); здесь нет необходимости использовать прием ветвления процессов, потому что файлы каналов fifo являются внешними по отношению к процессам -- совместное использование дескрипторов файлов в родителе/потомке здесь неактуально;
import os, time, sys
fifoname = ‘/tmp/pipefifo’ # имена должны быть одинаковыми
def child():
pipeout = os.open(fifoname, os.O_WRONLY) # открыть fifo как дескриптор zzz = 0 while True:
time.sleep(zzz)
msg = (‘Spam %03d\n’ % zzz).encode() # был открыт в двоичном режиме os.write(pipeout, msg) zzz = (zzz+1) % 5
def parent():
pipein = open(fifoname, ‘r’) # открыть fifo как текстовый файл
while True:
line = pipein.readline()[:-1] # блокируется до отправки данных
print(‘Parent %d got “%s” at %s’ % (os.getpid(), line, time.time()))
if__name__== ‘__main__’:
if not os.path.exists(fifoname):
os.mkfifo(fifoname) # создать именованный канал
if len(sys.argv) == 1:
parent() # если нет аргументов - запустить как родительский процесс else: # иначе - как дочерний процесс
child()
Поскольку канал fifo существует независимо от родительского и дочернего процессов, нет никакой необходимости использовать прием ветвления процессов: дочерний процесс может быть запущен независимо от родительского и должен лишь открыть файл fifo с таким же именем. Ниже, например, в Cygwin, родитель запущен в одном окне командной строки, а потомок - в другом. Сообщения начинают появляться в окне родителя только после того, как потомок будет запущен и начнет записывать сообщения в файл fifo:
[C:\...\PP4E\System\Processes] $ python pipefifo.py # окно родителя
Parent 8324 got “Spam 000” at 1268003696.07
Parent 8324 got “Spam 001” at 1268003697.06
Parent 8324 got “Spam 002” at 1268003699.07
Parent 8324 got “Spam 003” at 1268003702.08
Parent 8324 got “Spam 004” at 1268003706.09
Parent 8324 got “Spam 000” at 1268003706.09
Parent 8324 got “Spam 001” at 1268003707.11
Parent 8324 got “Spam 002” at 1268003709.12
Parent 8324 got “Spam 003” at 1268003712.13
Parent 8324 got “Spam 004” at 1268003716.14
Parent 8324 got “Spam 000” at 1268003716.14
Parent 8324 got “Spam 001” at 1268003717.15
...и так далее: Ctrl-C для выхода...
[C:\...\PP4E\System\Processes]$ file /tmp/pipefifo # окно потомка
/tmp/pipefifo: fifo (named pipe)
[C:\...\PP4E\System\Processes]$ python pipefifo.py -child ...Ctrl-C для выхода...
Области применения именованных каналов
Благодаря отображению точек взаимодействий в файловую систему, доступную всем программам, выполняющимся на компьютере, именованные каналы способны решать самые разнообразные задачи взаимодействий между процессами на платформах, где они поддерживаются. Например, хотя сценарий, представленный в этом разделе, запускает независимые программы, именованные каналы могут также использоваться как механизм взаимодействий между потоками выполнения внутри процесса и между процессами, связанными отношением роди-тель/потомок, практически так же, как анонимные каналы.
Однако, благодаря поддержке взаимодействий независимых программ, файлы fifo могут найти более широкое применение в моделях взаимодействий клиент/сервер. Например, с помощью именованных каналов можно сделать связь отладчика командной строки с графическим интерфейсом, о реализации которой на основе анонимных каналов я рассказывал выше, более гибкой - при использовании файлов fifo для соединения потоков ввода-вывода графического интерфейса с потоками ввода-вывода отладчика командной строки, графический интерфейс можно было бы запускать независимо.
Подобную функциональность предоставляют сокеты, которые к тому же подкупают свойственной им возможностью передачи данных по
сети и переносимостью на платформу Windows, о чем рассказывается в следующем разделе.
Сокеты: первый взгляд
Сокеты, реализация которых на языке Python находится в модуле socket, представляют собой более универсальный механизм IPC, чем каналы, которые мы рассматривали перед этим. Сокеты позволяют передавать данные не только между программами, выполняющимися на одном и том же компьютере, но и между программами, выполняющимися на разных компьютерах, соединенных сетью. При использовании сокетов для реализации механизма взаимодействий между процессами, выполняющимися на одном и том же компьютере, программы подключаются к сокетам, используя глобальный для этого компьютера номер порта, и передают данные. При использовании сокетов для выполнения сетевых соединений программы указывают сетевое имя компьютера и номер порта и обмениваются данными с удаленными программами.