Давайте синхронизировать потоки в Python

Python

Для меня это был волшебный момент, внезапный инсайт, когда я впервые узнал о многопоточности. Меня восхитила сама возможность параллельного выполнения действий, (хотя важно заметить, что на компьютере с одноядерным процессором вычисления выполняются не строго параллельно, причем вычисления в Python распараллеливаются частично из-за наличия GIL-концепции ‑ способа синхронизации потоков в Python. Многопоточность открывает новые возможности для вычислений, но вслед за могуществом приходит и ответственность.

Имеется ряд проблем, возникающих при использовании многопоточности – попытка множества потоков получить доступ к одному и тому же фрагменту данных может привести к проблемам несовместимости или получению искаженной информации (например, фраза  HWeolrldo вместо  Hello World  на консоли). Подобные проблемы возникают, когда компьютеру не указан способ организации потоков.

Как правильно приказать компьютеру синхронизировать потоки? Для этого используются примитивы синхронизации — простые программные механизмы, обеспечивающие гармоничное взаимодействие потоков друг с другом.

В этом посте представлены некоторые популярные примитивы синхронизации в Python, определенные в стандартном модуле  threading.py. Большинство методов блокировки (то есть методов, блокирующих выполнение конкретного потока до тех пор, пока не выполнится условие) этих примитивов предоставляют дополнительные функции тайм-аута, но для простоты изложения они не будут здесь упоминаться. Также ради простоты описаны только основные функции этих объектов. Предполагается, что читатель обладает базовыми знаниями многопоточности в Python.

Изучим  LocksRLocksSemaphoresEventsConditionsи Barriers. Разумеется, можно создавать собственные примитивы пользовательской синхронизации, используя описанные мной в качестве подклассов. Начнем с Locks как с простейшего из примитивов и постепенно перейдем к более сложным.

Locks

Примитивы Lock  вероятно, простейшие примитивы в Python. Для Lock возможны только два состояния ‑ заблокирован и разблокирован. Примитив создается в разблокированном состоянии и содержит два метода –  acquire() и  release().  Метод  acquire() блокирует Lock и выполнение блока до тех пор, пока метод  release() из другой сопрограммы не разблокирует его. Затем он снова блокирует Lock и возвращает значение True. Метод  release() вызывается только в заблокированном состоянии – устанавливает состояние разблокировки и немедленно возвращает управление. Вызов  release() в разблокированном состоянии приводит к RunTimeError.

Вот код, который использует примитив Lock для безопасного доступа к общей переменной:

#lock_tut.py
from threading import Lock, Thread
lock = Lock()
g = 0

def add_one():
   """
   Just used for demonstration. It’s bad to use the ‘global’
   statement in general.
   """
   
   global g
   lock.acquire()
   g += 1
   lock.release()

def add_two():
   global g
   lock.acquire()
   g += 2
   lock.release()

threads = []
for func in [add_one, add_two]:
   threads.append(Thread(target=func))
   threads[-1].start()

for thread in threads:
   """
   Waits for threads to complete before moving on with the main
   script.
   """
   thread.join()

print(g)

Этот код просто дает результат в виде числа 3, но теперь мы уверены, что две функции не изменяют значение глобальной переменной g одновременно, хотя работают в двух разных потоках. Таким образом, Lock  могут использоваться для предотвращения противоречивости в выходных данных, позволяя каждый раз только одному потоку изменять данные.

RLocks

Стандартный Lock не знает, какой поток блокируется в данный момент. Если блокировка сохраняется, блокируется любой из потоков, пытающихся получить доступ, даже если этот тот же самый поток, который уже удерживает блокировку. Именно для таких случаев и используется RLock — блокировка повторного входа. Вы можете расширить код в следующем фрагменте, добавив выходные инструкции для демонстрации возможностей RLock предотвращать нежелательную блокировку.

#rlock_tut.py
import threading

num = 0
lock = Threading.Lock()

lock.acquire()
num += 1
lock.acquire() # This will block.
num += 2
lock.release()


# With RLock, that problem doesn’t happen.
lock = Threading.RLock()

lock.acquire()
num += 3
lock.acquire() # This won’t block.
num += 4
lock.release()
lock.release() # You need to call release once for each call to acquire.

Возможно рекурсивное использование RLock — когда родительский вызов функции блокирует вложенный вызов. Таким образом  RLock используются для вложенного доступа к общим ресурсам.

Семафоры

Семафоры – это просто дополнительные счетчики. Вызов acquire() будет блокироваться семафором только после превышении определенного количества запущенных потоков acquire(). Значение соответствующего счетчика уменьшается на каждый вызов на acquire() и увеличивается на каждый вызов release(). Значение ValueError будет возникать, если вызовы release() будут пытаться увеличивать значение счетчика после достижения заданного максимального значения (количества потоков, которые допустимые семафором acquire() до применения блокировки). Следующий код демонстрирует использование семафоров для простой задачи производитель-потребитель.

#semaphores_tut.py
import random, time
from threading import BoundedSemaphore, Thread
max_items = 5
"""
Consider 'container' as a container, of course, with a capacity of 5
items. Defaults to 1 item if 'max_items' is passed.
"""
container = BoundedSemaphore(max_items)
def producer(nloops):
    for i in range(nloops):
        time.sleep(random.randrange(2, 5))
        print(time.ctime(), end=": ")
        try:
            container.release()
            print("Produced an item.")
        except ValueError:
            print("Full, skipping.")
def consumer(nloops):
    for i in range(nloops):
        time.sleep(random.randrange(2, 5))
        print(time.ctime(), end=": ")
        """
        In the following if statement we disable the default
        blocking behaviour by passing False for the blocking flag.
        """
        if container.acquire(False):
            print("Consumed an item.")
        else:
            print("Empty, skipping.")
threads = []
nloops = random.randrange(3, 6)
print("Starting with %s items." % max_items)
threads.append(Thread(target=producer, args=(nloops,)))
threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops+max_items+2),)))
for thread in threads:  # Starts all the threads.
    thread.start()
for thread in threads:  # Waits for threads to complete before moving on with the main script.
    thread.join()
print("All done.")

Модуль  threading также предоставляет простой класс Semaphore. Класс Semaphore предоставляет счетчик, позволяющий вызывать release() произвольное количество раз. Однако, чтобы избежать ошибок при программировании, лучше использовать BoundedSemaphore, который вызывает ошибку, если вызов release() пытается увеличивать значение счетчика выше заданного максимального значения.

Семафоры, как правило, используются для ограничения ресурсов, например, ограничения доступа к серверу, допуская обрабатывать только 10 клиентов за раз. В этом случае несколько потоков соединений конкурируют за ограниченный ресурс (в нашем примере это сервер).

Events

Примитив синхронизации Event работает как простой коммуникатор между потоками. Он использует внутренний флаг, который потоки могут устанавливать set() или сбрасывать clear().  Другие потоки могут ожидать  wait() установки внутреннего флага set(). Метод  wait()  блокирует пока флаг не станет истинным. Следующий фрагмент демонстрирует, как Event могут использоваться для запуска действий.

#event_tut.py
import random, time
from threading import Event, Thread

event = Event()

def waiter(event, nloops):
    for i in range(nloops):
    print(“%s. Waiting for the flag to be set.” % (i+1))
    event.wait() # Blocks until the flag becomes true.
    print(“Wait complete at:”, time.ctime())
    event.clear() # Resets the flag.
    print()

def setter(event, nloops):
    for i in range(nloops):
    time.sleep(random.randrange(2, 5)) # Sleeps for some time.
    event.set()

threads = []
nloops = random.randrange(3, 6)

threads.append(Thread(target=waiter, args=(event, nloops)))
threads[-1].start()
threads.append(Thread(target=setter, args=(event, nloops)))
threads[-1].start()

for thread in threads:
    thread.join()

print(“All done.”)

Conditions

Объект Condition является просто усовершенствованным вариантом объекта Event. Он тоже работает как коммуникатор между потоками и может применяться для уведомления notify() других потоков об изменении состояния программы. Например, его можно использовать для сигнализации доступности ресурса. Другие потоки также должны получать условие acquire() (и, следовательно, связанное с ним блокирование) до ожидания wait() для удовлетворения условия. Кроме того, поток должен освободить release() по условию Condition после завершения связанных с ним действий, так что другие потоки могут получить условие для своих целей. Нижеследующий код демонстрирует реализацию другой простой проблемы производитель-потребитель с помощью объекта Condition.

#condition_tut.py
import random, time
from threading import Condition, Thread
"""
'condition' variable will be used to represent the availability of a produced
item.
"""
condition = Condition()
box = []
def producer(box, nitems):
    for i in range(nitems):
        time.sleep(random.randrange(2, 5))  # Sleeps for some time.
        condition.acquire()
        num = random.randint(1, 10)
        box.append(num)  # Puts an item into box for consumption.
        condition.notify()  # Notifies the consumer about the availability.
        print("Produced:", num)
        condition.release()
def consumer(box, nitems):
    for i in range(nitems):
        condition.acquire()
        condition.wait()  # Blocks until an item is available for consumption.
        print("%s: Acquired: %s" % (time.ctime(), box.pop()))
        condition.release()
threads = []
"""
'nloops' is the number of times an item will be produced and
consumed.
"""
nloops = random.randrange(3, 6)
for func in [producer, consumer]:
    threads.append(Thread(target=func, args=(box, nloops)))
    threads[-1].start()  # Starts the thread.
for thread in threads:
    """Waits for the threads to complete before moving on
       with the main script.
    """
    thread.join()
print("All done.")

Возможны и другие применения для Condition. Например, при разработке потокового API, который уведомляет клиента о времени начала доступности данных.

Barriers

Барьеры являются простыми примитивами синхронизации и используются потоками для ожидании друг друга. Каждый поток пытается передать барьер с помощью вызова метода wait(), который будет блокироваться, пока все потоки не создадут этот вызов. Как только это произойдет, потоки будут запущены одновременно. Следующий фрагмент демонстрирует использование Barrier.

#barrier_tut.py
from random import randrange
from threading import Barrier, Thread
from time import ctime, sleep

num = 4
# 4 threads will need to pass this barrier to get released.
b = Barrier(num)
names = [“Harsh”, “Lokesh”, “George”, “Iqbal”]

def player():
    name = names.pop()
    sleep(randrange(2, 5))
    print(“%s reached the barrier at: %s” % (name, ctime()))
    b.wait()
    
threads = []
print(“Race starts now…”)

for i in range(num):
    threads.append(Thread(target=player))
    threads[-1].start()
"""
Following loop enables waiting for the threads to complete before moving on with the main script.
"""
for thread in threads:
    thread.join()
print()
print(“Race over!”)

Для барьеров можно найти множество применений, одним из которых может стать синхронизация работы сервера и клиента, поскольку серверу часто приходится ожидать клиента после инициализации.

На этом завершим обсуждение примитивов синхронизации в Python.

Этот пост написан как решение упражнения в книге «Программирование приложений на основе ядра Python» Уэсли Чана. Если этот пост вам понравился, познакомьтесь с другими моими работами из этой книги на GitHub. Исходные коды из этой статьи также доступны в моем профиле.

Специально для сайта ITWORLD.UZ. Новость взята с сайта NOP::Nuances of programming