进程间通信

2019-08-13

进程间通信

多个进程修改同一个数据的时候:

用锁:牺牲速度,保证数据安全。即同一时间只能有一个任务在操作数据。

缺点:效率低,需要自己加锁,可能造成死锁。

优点:保证数据安全

基于锁的缺点,我们需要一个方案:效率高,又能帮忙解决锁问题。这就是multiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道

队列和管道都是将数据放在内存中。

队列又是基于管道+锁实现的,帮我们解决了复杂的锁问题。

进程队列介绍:

from multiprocessing import Queue

q = Queue(5)  # 创建队列,5表示队列最大存储数
q.put() # 添加数据,超出最大存储数,不会报错,会原地等待,进入阻塞态
q.get() # 取出数据,队列中为空时,不会报错,进入阻塞,直至有人往队列放入值
q.full() # 判断队列是否满了
q.empty # 判断队列数据是否取完
q.get_nowait() # 取值,没有值的话,不进入阻塞,直接报错
# 注意:full empty get_nowait 都不适用于多进程

两个进程之间的通信:

from multiprocessing import Process, Queue

def producer(q):
    q.put(1)
    
def consumer(q):
    q.get()

if __name__ == '__main__':
    q = Queue(5)
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()

生产者消费者模型:

生产者:生产/制造数据的

消费者:消费/处理数据的

为了解决什么问题:供需不平衡

生产者与消费者模型是通过一个容器解决生产者与消费者的强耦合问题。这个容器就是阻塞队列!生数据和取数据都通过这个阻塞队列。

基于队列Queue解决生产者消费者问题(low ! )

from multiprocessing import Process, Queue
import time
import random
import os


def producer(q):
    print('我是生产者')

    for i in range(10):
        time.sleep(random.random())
        res = '包子{}'.format(i)
        q.put(res)
        print('{}生产了{}'.format(os.getpid(), res))

    # 发送一个生产结束的讯号
    # q.put(None)

def consumer(q):
    print('我是消费者')
    while True:
        time.sleep(random.random())
        res = q.get()
        if res == None: break
        print('{}消费了{}'.format(os.getpid(), res))


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=(q,))
    p1.start()
    p1.join()  # 讯号必须等生产者运行结束后才能发

    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.start()
    # 发送一个生产结束的讯号
    c2.start()
    q.put(None)
    q.put(None)  #如果有多个生产者,多个消费者,就得生产者.join多次,q.put(None)多次,非常low!
    print('主')

基于JoinableQueue解决生产者消费者问题:

from multiprocessing import JoinableQueue, Process
import time, random

def producer(q):
    for i in range(10):
        time.sleep(random.random())
        res = '包子{}'.format(i)
        q.put(res)
        print('生产者生产了一个{}'.format(res))
    q.join()

def consumer(q):
    while True:
        time.sleep(random.random())
        res = q.get()
        q.task_done()
        print('消费者消费了一个{}'.format(res))


if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=((q,)))
    p2 = Process(target=producer, args=((q,)))

    c1 = Process(target=consumer, args=((q,)))
    c2 = Process(target=consumer, args=((q,)))
    # c1.daemon = True  # 消费者应该随着主进程的结束而结束
    # c2.daemon = True

    for i in [p1, p2, c1, c2]:
        i.start()

    p1.join()
    p2.join()


    print('主')