Управление процессами в python

subprocess

Предоставляет АПИ, позволяющий создавать дополнительные процессы и обмениваться данными с ними. Модуль поддерживает обмен через стандартные каналы ввода-вывода, поэтому удобен для работы с текстом.

Предоставляются три основных интрумента:

Функционал, предоставленный модулем, заменяет собой os.system(), os.spawn(), версии popen() из os, а также модуль commands. Подробнее об этом

Доступно:

signal

Реализует ассинхронный обмен данными между процессами и позволяет создавать обработчики для событий.

Сигналы - это средство операционнй системы, обеспечтвающее доставку уведомлений о наступлении событий и ассинхронную обработку уведомлений. Сигналы можно посылать из процесса в процесс.

Сигналы прерывают ход выполнения программы. что может вести к ошибкам (особенно это касается ввода-вывода), если сигнал получен в момент выполнения.

Формат сигналов специфичен платформе и определяется в заголовочных файлах C операционной системы.

Обработчик сигналов Python не выполняется внутри обработчика сигналов низкого уровня (C). Вместо этого обработчик сигналов низкого уровня устанавливает флаг, который сообщает виртуальной машине выполнить соответствующий обработчик сигнала Python в более поздний момент (например, в следующей инструкции байт-кода). Это приводит к некоторым нюансам в обработке ошибок

Обработчики сигналов Python всегда выполняются в основном потоке Python основного интерпретатора, даже если сигнал был получен в другом потоке. Это означает, что сигналы нельзя использовать как средство межпоточного взаимодействия. Кроме того, только основной поток основного интерпретатора может устанавливать новый обработчик сигналов.

Пример

import signal, os

def handler(signum, frame):
    print('Signal handler called with signal', signum)
    raise OSError("Couldn't open device!")

# Set the signal handler and a 5-second alarm
signal.signal(signal.SIGALRM, handler)
signal.alarm(5)

# This open() may hang indefinitely
fd = os.open('/dev/ttyS0', os.O_RDWR)

signal.alarm(0)          # Disable the alarm

multiprocessing

Позволяет инициировать процессы на разных ядрах в обход [gil]. АПИ модуля идентично [threading]

Поддерживается три метода создания процессов:

Установить метод можно так (пример из документации):

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

или получив объект контекста через get_context(), апи которого идентично апи модуля, что позволяет использовать несколько контекстов одновременно (пример из документации):

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Объекты из разных контекстов, особенно те, кторые реализуют ожидание с блокировкой, могут быть несовместимы с другими контекстами.

Process

Процессы создаются с помощью Process, который реализует апи, аналогичный threading:

Кроме того, поддерживается передача аргументов в том же самом виде, что и в threading за исключением того, что объекты, передаваемые процессам должны иметь возможность быть сериализованными с помощью pickle (смотри [data-storage-python]).

import multiprocessing

def worker(num):
    """thread worker function"""
    print('Worker:', num)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

Дочерний процесс должен иметь возможность импортировать сценарий, воркера. Это порождает проблему рекурсивного выполнения при импорте. Использование конструкции if __name__ == "__main__": в месте запуска процессов гарантирует, что воркер не будет исполнен при импорте.

Можно создавать дочерние классы от Process переопределяя метод run()

Аналогично threading мы можем определить текущий процесс по имени:

import multiprocessing

def worker():
    name = multiprocessing.current_process().name
    print(name)

Процесс можно “убить” с помощью terminate() - этим не стоит злоупотреблять и стоит делать, только в ситуациях вероятного зависания процесса или при взаимоблокировке. После terminate() необходимо вызвать join() для процесса, чтобы multiprocessing успел обновить состояние посел преждевременного завершения (иначе мы рискуем завершить основную программу с ошибками). Кроме того, если этот метод используется, когда связанный процесс использует пайплайн, очередь или другие объекты, подразумевающие ожидание - это может привести к повреждению этих объектов. Метод kill() является аналогом, использующим другой код выхода.

Метод close() высвобождает все ресурсы, ассоциированные с процессом. Это становится полезным при создании пулов.

Объекты Pipes и Queue

В модуле реализованы объекты Pipe, Queue, SimpleQueue и JoinableQueue. Все они реализуют логику разделения процессов и передачи сообщений между процессами.

Пример очереди процессов, возврпащающей данные сосновному рабочему процессу. В данном случае для принудительной остановки используются стоп-объекты для каждого рабочего процесса, которые добавляются в очередь. Когда рабочему процессу встречается такой объект - он завершается. Метод join() очереди используется, чтобы дождаться завершения всех процессов перед обработкой результата.

Оюмен сигналами между процессами

Аналогично [threading] реализованы следующие объекты:

Разделяемые типы

Есть возможность создавать объекты, разделяющие общую память для дочерних процессов.

Managers

Менеджеры предоставляют способ создания данных, которые могут совместно использоваться разными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект-менеджер управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам с помощью прокси.

В основном придется использовать:

В данном простом примере общий словарь, предоставленный посредством Manager() заполняется разными процессами. Естественно порядок вставки не гарантируется, т.к. процессы конкурируют за ресурсы процессора.

import multiprocessing

def worker(d, key, value):
    d[key] = value

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [
        multiprocessing.Process(
            target=worker,
            args=(d, i, i * 2),
        )
        for i in range(10)
    ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print(d)

{0: 0, 3: 6, 1: 2, 2: 4, 4: 8, 5: 10, 7: 14, 6: 12, 8: 16, 9: 18}

Кроме того, можно вызвать объект Namespace менеджера в котором реализовывать разделяемые объекты

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

Важно - обновление содержимого изменяемых типов не рьбновляется автоматически в простарнстве имен. Такие типы как list() придется переопределять.

Примеры кастомизации и использования менеджеров удаленно тут

Наконец можно создавать прокси-обэекты - это объекты которые относятся к разделяемому (общему) объекту, который (предположительно) живет в другом процессе. Общий объект называется референтом прокси. У нескольких прокси-объектов может быть один и тот же референт. Прокси-объект имеет методы, которые вызывают соответствующие методы его референта (хотя не каждый метод референта обязательно будет доступен через прокси). Таким образом, прокси можно использовать так же, как и его референт. Важная особенность - прокси-объект сериализуем и его можно передавать между процессами.

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

Больше примеров тут

Pools

Класс Pool обеспечивает управление фиксированным количеством процессов, когда работу можно разбить на независимые части. Строится список, который заполняется при завершении процессов, а затем возвращается. Создавая пул, можно задать количество процессов и функцию, которая будет производить работую, которая будет вызвана однократно каждым процессом при запуске.

Pool поддерживает асинхронные расчеты с таймаутами и обратными вызовами и имеет реализацию map. Если для процессов установлено значение None, то используется число, возвращаемое os.cpu_count(). maxtasksperchild - это количество задач, которые рабочий процесс может выполнить до того, как он завершится и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. Это полезно в случаях, когда есть медленные и быстрые задачи, чтобы общее время вычислений не зависело от того, насколько процесс с медленными задачами доминирует в общем времени исполнения. По умолчанию для maxtasksperchild установлено значение None, что означает, что рабочие процессы будут жить столько же, сколько и пул. context можно использовать для указания контекста, используемого для запуска рабочих процессов.

Методы объекта пула должны вызываться только процессом, создавшим пул.

Предупреждение Объекты multiprocessing.pool имеют внутренние ресурсы, которыми необходимо правильно управлять (как и любой другой ресурс), используя пул в качестве диспетчера контекста или вызывая close() и terminate() вручную. Невыполнение этого требования может привести к зависанию процесса при завершении. Крмое того, CPython не гарантирует, что сборщик мусора завершит и удалит пул.

Пример (в данном случае задействован двойной доступный пул ядер):

import multiprocessing

def do_calculation(data):
    return data * 2

def start_process():
    print('Starting', multiprocessing.current_process().name)

if __name__ == '__main__':
    inputs = list(range(10))
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
        maxtasksperchild=2,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print(pool_outputs)

Starting ForkPoolWorker-1
Starting ForkPoolWorker-2
Starting ForkPoolWorker-3
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-7
Starting ForkPoolWorker-8
Starting ForkPoolWorker-9
Starting ForkPoolWorker-10
Starting ForkPoolWorker-11
Starting ForkPoolWorker-12
Starting ForkPoolWorker-14
Starting ForkPoolWorker-13
Starting ForkPoolWorker-15
Starting ForkPoolWorker-16
Starting ForkPoolWorker-18
Starting ForkPoolWorker-17
Starting ForkPoolWorker-19
Starting ForkPoolWorker-20
Starting ForkPoolWorker-21
Starting ForkPoolWorker-22
Starting ForkPoolWorker-23
Starting ForkPoolWorker-24
Starting ForkPoolWorker-25
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Что посмотреть еще?

Предоставляется внутренний логгер, доступный через log_to_stderr()

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

Подробный гайд как работать с множественными процессами и примеры смотри здесь. Если вкратце:

Пример последнего утверждения (здесь следует удалить джойн или поменять переместить его вниз):

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

Примеры с использованием менеджера и проксей, пула и очередей

[python-standart-library]

Смотри еще:

>>> На главную