|
|||||||
Процессы и потоки Python
Время создания: 04.09.2017 13:25
Текстовые метки: knowledge
Раздел: Python - Потоки
Запись: xintrea/mytetra_db_mcold/master/base/1504520721z5j1o2puxa/text.html на raw.githubusercontent.com
|
|||||||
|
|||||||
Программирование на Python: Часть 9. Процессы и потоки Параллельное программирование становится в последнее время жизненной необходимостью, которая диктуется темпами развития многоядерных процессоров. Одним из вариантов организации параллельного программирования является многопоточное программирование. Сергей Яковлев, Консультант, независимый специалист 02.09.2010 Разработайте и разверните ваше следующее приложение на облачной платформе IBM Bluemix. Начните работу с С появлением многоядерных процессоров стала общеупотребительной практика распространять нагрузку на все доступные ядра. Существует два основных подхода в распределении нагрузки: использование процессов и потоков. Использование нескольких процессов фактически означает использование нескольких программ, которые выполняются независимо друг от друга. Программно это решается с помощью системных вызовов exec и fork. Такой подход создает большие неудобства в управлении обмена данными между этими программами. В качестве альтернативы существует другой подход – создание многопоточных программ. Обмен данными между потоками существенно упрощается. Но управление такими программами усложняется, и вся ответственность ложится на программиста. Сегодня мы рассмотрим следующие темы.
В питоне есть стандартный модуль subprocess, который упрощает управление другими программами, передавая им опции командной строки и организуя обмен данными через каналы (pipe). Мы рассмотрим пример, в котором пользователь запускает программу из командной строки, которая в свою очередь запустит несколько дочерних программ. В данном примере два скрипта – рarent.py и child.py. Запускается parent.py. Child.py выступает в роли аргумента command, который передается в запускаемый процесс. У этого процесса есть стандартный вход, куда мы передаем два аргумента – поисковое слово и имя файла. Мы запустим два экземпляра программы child.py, каждый экземпляр будет искать слово word в своем файле – это будут файлы исходников самих программ. Запись на стандартный вход осуществляет модуль subprocess. Каждый процесс пишет результат своего поиска в консоль. В главном процессе мы ждем, пока все child не закончат свою работу. Код parent.py: import os import subprocess import sys child = os.path.join(os.path.dirname(__file__), "./child.py") word = 'word' file = ['./parent.py','./child.py'] pipes = [] for i in range(0,2): command = [sys.executable, child] pipe = subprocess.Popen(command, stdin=subprocess.PIPE) pipes.append(pipe) pipe.stdin.write(word.encode("utf8") + b"\n") pipe.stdin.write(file[i].encode("utf8") + b"\n") pipe.stdin.close() while pipes: pipe = pipes.pop() pipe.wait() Код child.py: import sys word = sys.stdin.readline().rstrip() filename = sys.stdin.readline().rstrip() try: with open(filename, "rb") as fh: while True: current = fh.readline() if not current: break if (word in current ): print("find: {0} {1}".format(filename,word)) except : pass 2. Как работают потоки в питоне Если нужно, чтобы ваше приложение выполняло несколько задач в одно и то же время, то можете воспользоваться потоками (threads). Потоки позволяют приложениям выполнять в одно и то же время множество задач. Многопоточность (multi-threading) важна во множестве приложений, от примитивных серверов до современных сложных и ресурсоёмких игр. Когда в одной программе работают несколько потоков, возникает проблема разграничения доступа потоков к общим данным. Предположим, что есть два потока, имеющих доступ к общему списку. Первый поток может делать итерацию по этому списку: for x in L а второй в этот момент начнет удалять значения из этого списка. Тут может произойти все что угодно: программа может упасть, или мы просто получим неверные данные. Решением в этом случае является применение блокировки. При этом доступ к заблокированному списку будет иметь только один поток, второй будет ждать, пока блокировка не будет снята. Применение блокировки порождает другую проблему – дедлок (deadlock) – мертвая блокировка. Пример дедлока: имеется два потока и два списка. Первый поток блокирует первый список, второй поток блокирует второй список. Первый поток изнутри первой блокировки пытается получить доступ к уже заблокированному второму списку, второй поток пытается проделать то же самое с первым списком. Получается неопределенная ситуация с бесконечным ожиданием. Эту ситуации легко описать, на практике все гораздо сложнее. Вариантом решения проблемы дедлоков является политика определения очередности блокировок. Например, в предыдущем примере мы должны определить, что блокировка первого списка идет всегда первой, а уже потом идет блокировка второго списка. Другая проблема с блокировками – в том, что несколько потоков могут одновременно ждать доступа к уже заблокированному ресурсу и при этом ничего не делать. Каждая питоновская программа всегда имеет главный управляющий поток. Питоновская реализация многопоточности ограниченная. Интерпретатор питона использует внутренний глобальный блокировщик (GIL), который позволяет выполняться только одному потоку. Это сводит на нет преимущества многоядерной архитектуры процессоров. Для многопоточных приложений, которые работают в основном на дисковые операции чтения/записи, это не имеет особого значения, а для приложений, которые делят процессорное время между потоками, это является серьезным ограничением. Для создания потоков мы будем использовать стандартный модуль threading. Есть два варианта создания потоков: вызов функции threading.Thread() вызов класса threading.Thread Следующий пример показывает, как к потоку приаттачить функцию через вызов функции: import threading import time def clock(interval): while True: print("The time is %s" % time.ctime()) time.sleep(interval) t = threading.Thread(target=clock, args=(15,)) t.daemon = True t.start() Пример на создание потока через вызов класса: в конструкторе обязательно нужно вызвать конструктор базового класса. Для запуска потока нужно выполнить метод start() объекта-потока, что приведет к выполнению действий в методе run(): import threading import time class ClockThread(threading.Thread): def __init__(self,interval): threading.Thread.__init__(self) self.daemon = True self.interval = interval def run(self): while True: print("The time is %s" % time.ctime()) time.sleep(self.interval) t = ClockThread(15) t.start() Для управления потоками существуют методы: start() – дает потоку жизнь. run() –этот метод представляет действия, которые должны быть выполнены в потоке. join([timeout]) – поток, который вызывает этот метод, приостанавливается, ожидая завершения потока, чей метод вызван. Параметр timeout (число с плавающей точкой) позволяет указать время ожидания (в секундах), по истечении которого приостановленный поток продолжает свою работу независимо от завершения потока, чей метод join был вызван. Вызывать join()некоторого потока можно много раз. Поток не может вызвать метод join() самого себя. Также нельзя ожидать завершения еще не запущенного потока. getName() – возвращает имя потока. setName(name) – присваивает потоку имя name. isAlive() – возвращает истину, если поток работает (метод run() уже вызван). isDaemon() – возвращает истину, если поток имеет признак демона. setDaemon(daemonic) – устанавливает признак daemonic того, что поток является демоном. activeCount() – возвращает количество активных в настоящий момент экземпляров класса Thread. Фактически это len(threading.enumerate()). currentThread() – возвращает текущий объект-поток, т.е. соответствующий потоку управления, который вызвал эту функцию. enumerate() – возвращает список активных потоков. В следующем примере будет решена аналогичная задача, что и в предыдущем примере с процессами: будут запущены три потока, каждый из которых будет работать по принципу утилиты grep. Имеется глобальный ресурс – work_queue – список файлов для поиска, который мы положим в очередь. Для этого будет использован объект Queue, который имеет встроенную блокировку: import threading import Queue class Worker(threading.Thread): def __init__(self, work_queue, word): super(Worker,self).__init__() self.work_queue = work_queue self.word = word def run(self): try: filename = self.work_queue.get() self.process(filename) finally: pass def process(self, filename): previous = " current=True with open(filename, "rb") as fh: while current: current = fh.readline() if not current: break current = current.decode("utf8", "ignore") if self.word in current : print("find {0}: {1}".format(self.word,filename)) previous = current word = 'import' filelist = ['./file1.py','./file2.py','./file3.py'] work_queue = Queue.Queue() for filename in filelist: work_queue.put(filename) for i in range(3): worker = Worker(work_queue, word) worker.start() В следующем примере будут созданы три потока, каждый из которых будет считывать стартовую страницу по указанному Web-адресу. В примере имеется глобальный ресурс – список урлов – url_list – доступ к которому будет блокироваться с помощью блокировки threading.Lock(). Объект Lock имеет методы: acquire([blocking=True]) – делает запрос на запирание замка. Если параметр blocking не указан или является истиной, то поток будет ожидать освобождения замка. Если параметр не был задан, метод не возвратит значения. Если blocking был задан и истинен, метод возвратит True (после успешного овладения замком). Если блокировка не требуется (т.е. задан blocking=False), метод вернет True, если замок не был заперт и им успешно овладел данный поток. В противном случае будет возвращено False. release() – запрос на отпирание замка. locked() – возвращает текущее состояние замка (True – заперт, False – открыт). import threading from urllib import urlopen class WorkerThread(threading.Thread): def __init__(self,url_list,url_list_lock): super(WorkerThread,self).__init__() self.url_list=url_list self.url_list_lock=url_list_lock
def run(self): while (1): nexturl = self.grab_next_url() if nexturl==None:break self.retrieve_url(nexturl)
def grab_next_url(self): self.url_list_lock.acquire(1) if len(self.url_list)<1: nexturl=None else: nexturl = self.url_list[0] del self.url_list[0] self.url_list_lock.release() return nexturl
def retrieve_url(self,nexturl): text = urlopen(nexturl).read() print text print '################### %s #######################' % nexturl
url_list=['http://linux.org.ru','http://kernel.org','http://python.org'] url_list_lock = threading.Lock() workerthreadlist=[] for x in range(0,3): newthread = WorkerThread(url_list,url_list_lock) workerthreadlist.append(newthread) newthread.start() for x in range(0,3): workerthreadlist[x].join() Параллельное программирование становится в последнее время жизненной необходимостью, которая диктуется темпами развития многоядерных процессоров. Одним из вариантов организации параллельного программирования является многопоточное программирование. В обычной программе действует всего один поток управления, а в многопоточной одновременно могут работать несколько потоков. В многопоточной программе усложняется контроль за обменом данных между потоками. Глобальные ресурсы необходимо предохранять от одновременного доступа со стороны нескольких потоков, чтобы не нарушить их целостности. В этой статье были рассмотрены инструменты контроля глобальных данных – блокировки, очереди. Приведенные примеры проверялись на версии питона 2.6. |
|||||||
Так же в этом разделе:
|
|||||||
|
|||||||
|