MyTetra Share
Делитесь знаниями!
Процессы и потоки Python
Время создания: 04.09.2017 13:25
Текстовые метки: knowledge
Раздел: Python - Потоки
Запись: xintrea/mytetra_db_mcold/master/base/1504520721z5j1o2puxa/text.html на raw.githubusercontent.com

Программирование на Python: Часть 9. Процессы и потоки

Параллельное программирование становится в последнее время жизненной необходимостью, которая диктуется темпами развития многоядерных процессоров. Одним из вариантов организации параллельного программирования является многопоточное программирование.

0 Комментарии

Сергей Яковлев, Консультант, независимый специалист

02.09.2010

  • Содержание

Разработайте и разверните ваше следующее приложение на облачной платформе IBM Bluemix.

Начните работу с
бесплатной
пробной версией

С появлением многоядерных процессоров стала общеупотребительной практика распространять нагрузку на все доступные ядра. Существует два основных подхода в распределении нагрузки: использование процессов и потоков.

Использование нескольких процессов фактически означает использование нескольких программ, которые выполняются независимо друг от друга. Программно это решается с помощью системных вызовов exec и fork. Такой подход создает большие неудобства в управлении обмена данными между этими программами.

В качестве альтернативы существует другой подход – создание многопоточных программ. Обмен данными между потоками существенно упрощается. Но управление такими программами усложняется, и вся ответственность ложится на программиста.

Сегодня мы рассмотрим следующие темы.

  • Как работают процессы.
  • Как работают потоки в питоне.
  • Создание потока.
  • Очереди (Queue).
  • Блокировки (Lock).

1. Как работают процессы

В питоне есть стандартный модуль subprocess, который упрощает управление другими программами, передавая им опции командной строки и организуя обмен данными через каналы (pipe). Мы рассмотрим пример, в котором пользователь запускает программу из командной строки, которая в свою очередь запустит несколько дочерних программ. В данном примере два скрипта – рarent.py и child.py. Запускается parent.py. Child.py выступает в роли аргумента command, который передается в запускаемый процесс. У этого процесса есть стандартный вход, куда мы передаем два аргумента – поисковое слово и имя файла. Мы запустим два экземпляра программы child.py, каждый экземпляр будет искать слово word в своем файле – это будут файлы исходников самих программ. Запись на стандартный вход осуществляет модуль subprocess. Каждый процесс пишет результат своего поиска в консоль. В главном процессе мы ждем, пока все child не закончат свою работу.

Код parent.py:

import os

import subprocess

import sys



child = os.path.join(os.path.dirname(__file__), "./child.py")

word = 'word'

file = ['./parent.py','./child.py']


pipes = []

for i in range(0,2):

command = [sys.executable, child]

pipe = subprocess.Popen(command, stdin=subprocess.PIPE)

pipes.append(pipe)

pipe.stdin.write(word.encode("utf8") + b"\n")

pipe.stdin.write(file[i].encode("utf8") + b"\n")

pipe.stdin.close()


while pipes:

pipe = pipes.pop()

pipe.wait()

Код child.py:

import sys


word = sys.stdin.readline().rstrip()

filename = sys.stdin.readline().rstrip()


try:

with open(filename, "rb") as fh:

while True:

current = fh.readline()

if not current:

break

if (word in current ):

print("find: {0} {1}".format(filename,word))

except :

pass

В начало

2. Как работают потоки в питоне

Если нужно, чтобы ваше приложение выполняло несколько задач в одно и то же время, то можете воспользоваться потоками (threads). Потоки позволяют приложениям выполнять в одно и то же время множество задач. Многопоточность (multi-threading) важна во множестве приложений, от примитивных серверов до современных сложных и ресурсоёмких игр.

Когда в одной программе работают несколько потоков, возникает проблема разграничения доступа потоков к общим данным. Предположим, что есть два потока, имеющих доступ к общему списку. Первый поток может делать итерацию по этому списку:

for x in L

а второй в этот момент начнет удалять значения из этого списка. Тут может произойти все что угодно: программа может упасть, или мы просто получим неверные данные.

Решением в этом случае является применение блокировки. При этом доступ к заблокированному списку будет иметь только один поток, второй будет ждать, пока блокировка не будет снята.

Применение блокировки порождает другую проблему – дедлок (deadlock) – мертвая блокировка. Пример дедлока: имеется два потока и два списка. Первый поток блокирует первый список, второй поток блокирует второй список. Первый поток изнутри первой блокировки пытается получить доступ к уже заблокированному второму списку, второй поток пытается проделать то же самое с первым списком. Получается неопределенная ситуация с бесконечным ожиданием. Эту ситуации легко описать, на практике все гораздо сложнее.

Вариантом решения проблемы дедлоков является политика определения очередности блокировок. Например, в предыдущем примере мы должны определить, что блокировка первого списка идет всегда первой, а уже потом идет блокировка второго списка.

Другая проблема с блокировками – в том, что несколько потоков могут одновременно ждать доступа к уже заблокированному ресурсу и при этом ничего не делать. Каждая питоновская программа всегда имеет главный управляющий поток.

Питоновская реализация многопоточности ограниченная. Интерпретатор питона использует внутренний глобальный блокировщик (GIL), который позволяет выполняться только одному потоку. Это сводит на нет преимущества многоядерной архитектуры процессоров. Для многопоточных приложений, которые работают в основном на дисковые операции чтения/записи, это не имеет особого значения, а для приложений, которые делят процессорное время между потоками, это является серьезным ограничением.

В начало

3. Создание потока

Для создания потоков мы будем использовать стандартный модуль threading. Есть два варианта создания потоков:

вызов функции

threading.Thread()

вызов класса

threading.Thread

Следующий пример показывает, как к потоку приаттачить функцию через вызов функции:

import threading

import time

def clock(interval):

while True:

print("The time is %s" % time.ctime())

time.sleep(interval)

t = threading.Thread(target=clock, args=(15,))

t.daemon = True

t.start()

Пример на создание потока через вызов класса: в конструкторе обязательно нужно вызвать конструктор базового класса. Для запуска потока нужно выполнить метод start() объекта-потока, что приведет к выполнению действий в методе run():

import threading

import time

class ClockThread(threading.Thread):

def __init__(self,interval):

threading.Thread.__init__(self)

self.daemon = True

self.interval = interval

def run(self):

while True:

print("The time is %s" % time.ctime())

time.sleep(self.interval)

t = ClockThread(15)

t.start()

Для управления потоками существуют методы:

start() – дает потоку жизнь.

run() –этот метод представляет действия, которые должны быть выполнены в потоке.

join([timeout]) – поток, который вызывает этот метод, приостанавливается, ожидая завершения потока, чей метод вызван. Параметр timeout (число с плавающей точкой) позволяет указать время ожидания (в секундах), по истечении которого приостановленный поток продолжает свою работу независимо от завершения потока, чей метод join был вызван. Вызывать join()некоторого потока можно много раз. Поток не может вызвать метод join() самого себя. Также нельзя ожидать завершения еще не запущенного потока.

getName() – возвращает имя потока.

setName(name) – присваивает потоку имя name.

isAlive() – возвращает истину, если поток работает (метод run() уже вызван).

isDaemon() – возвращает истину, если поток имеет признак демона.

setDaemon(daemonic) – устанавливает признак daemonic того, что поток является демоном.

activeCount() – возвращает количество активных в настоящий момент экземпляров класса Thread. Фактически это len(threading.enumerate()).

currentThread() – возвращает текущий объект-поток, т.е. соответствующий потоку управления, который вызвал эту функцию.

enumerate() – возвращает список активных потоков.

В начало

4. Очереди (Queue)

В следующем примере будет решена аналогичная задача, что и в предыдущем примере с процессами: будут запущены три потока, каждый из которых будет работать по принципу утилиты grep. Имеется глобальный ресурс – work_queue – список файлов для поиска, который мы положим в очередь. Для этого будет использован объект Queue, который имеет встроенную блокировку:

import threading

import Queue


class Worker(threading.Thread):


def __init__(self, work_queue, word):

super(Worker,self).__init__()

self.work_queue = work_queue

self.word = word


def run(self):

try:

filename = self.work_queue.get()

self.process(filename)

finally:

pass


def process(self, filename):

previous = "

current=True

with open(filename, "rb") as fh:

while current:

current = fh.readline()

if not current: break

current = current.decode("utf8", "ignore")

if self.word in current :

print("find {0}: {1}".format(self.word,filename))

previous = current


word = 'import'

filelist = ['./file1.py','./file2.py','./file3.py']

work_queue = Queue.Queue()

for filename in filelist:

work_queue.put(filename)

for i in range(3):

worker = Worker(work_queue, word)

worker.start()

В начало

5. Блокировки (Lock)

В следующем примере будут созданы три потока, каждый из которых будет считывать стартовую страницу по указанному Web-адресу. В примере имеется глобальный ресурс – список урлов – url_list – доступ к которому будет блокироваться с помощью блокировки threading.Lock(). Объект Lock имеет методы:

acquire([blocking=True]) – делает запрос на запирание замка. Если параметр blocking не указан или является истиной, то поток будет ожидать освобождения замка.

Если параметр не был задан, метод не возвратит значения.

Если blocking был задан и истинен, метод возвратит True (после успешного овладения замком).

Если блокировка не требуется (т.е. задан blocking=False), метод вернет True, если замок не был заперт и им успешно овладел данный поток. В противном случае будет возвращено False.

release() – запрос на отпирание замка.

locked() – возвращает текущее состояние замка (True – заперт, False – открыт).

import threading

from urllib import urlopen


class WorkerThread(threading.Thread):

def __init__(self,url_list,url_list_lock):

super(WorkerThread,self).__init__()

self.url_list=url_list

self.url_list_lock=url_list_lock

def run(self):

while (1):

nexturl = self.grab_next_url()

if nexturl==None:break

self.retrieve_url(nexturl)

def grab_next_url(self):

self.url_list_lock.acquire(1)

if len(self.url_list)<1:

nexturl=None

else:

nexturl = self.url_list[0]

del self.url_list[0]

self.url_list_lock.release()

return nexturl

def retrieve_url(self,nexturl):

text = urlopen(nexturl).read()

print text

print '################### %s #######################' % nexturl

url_list=['http://linux.org.ru','http://kernel.org','http://python.org']

url_list_lock = threading.Lock()

workerthreadlist=[]

for x in range(0,3):

newthread = WorkerThread(url_list,url_list_lock)

workerthreadlist.append(newthread)

newthread.start()

for x in range(0,3):

workerthreadlist[x].join()

В начало

Заключение

Параллельное программирование становится в последнее время жизненной необходимостью, которая диктуется темпами развития многоядерных процессоров. Одним из вариантов организации параллельного программирования является многопоточное программирование. В обычной программе действует всего один поток управления, а в многопоточной одновременно могут работать несколько потоков.

В многопоточной программе усложняется контроль за обменом данных между потоками. Глобальные ресурсы необходимо предохранять от одновременного доступа со стороны нескольких потоков, чтобы не нарушить их целостности. В этой статье были рассмотрены инструменты контроля глобальных данных – блокировки, очереди.

Приведенные примеры проверялись на версии питона 2.6.

 
MyTetra Share v.0.67
Яндекс индекс цитирования