Python之多进程编程之multiprocessing模块

multiprocessing模块是Python基于进程的并发相关的模块,和threading模块类似,还提供了threading模块中没有的API,比如Pool对象,他可以很方便的对多个函数或者分布式数据并行执行和运算。

python-multithreading

0x00 多进程的特点

多进程执行特点为:他们本质上就是异步的,需要多个并发事务,各个事务的运行顺序可以是不确定的、随机的、不可预测的。特别适合运行密集型任务。

threading-pic多进程三状态转换图

从图中可以看出,先启动的进程,只是先进入了就绪状态,并未进行运行,所以多进程的运行顺序是不确定的。而多进程分配时间片刻的时候,也是有操作系统调度完成的,因而其有随机性与不可预测性。

0x01 启用多进程的两种方式

在Windows系统中,父进程一定要在if __name__ == ‘__main__’中启动,因为在Windows系统中,子进程会默认import父进程的所有东西,如果没有这个条件 ,那么将会造成无限Recursion的局面,当然就是报错了!

a.常规方法

import os
from multiprocessing import Process

def func(i):
    print(i,os.getpid(),os.getppid())

if __name__ == '__main__':
    p = Process(target=func,args=('test',))
    p.start()
    print(os.getpid())
'''
200
test 2800 200
'''

创建一个Process的对象,通过start()来创建新的进程。在创建Process对象的时候,需要设置的一个目标函数与参数。target后设置目标函数即可。而arges则放入相应的参数,如果没有参数则可以省略该部分,若只有一个参数的时候,需要加一个逗号,因为args传入的必须是一个元祖对象。

b.面向对象方法

from multiprocessing import Process
import os

class myProcess(Process):
    def run(self):
        print(os.getpid(),os.getppid())

if __name__ == '__main__':
    print(os.getpid())
    p = myProcess()
    p.start()
'''
1376
3916 1376
'''

通过实现一个类其继承Process类,之后重新run()方法即可,启动进程的时候也是使用start()函数,start()函数启动后默认调用rum()方法,从而启动进程。

使用面向对象的方法去启开多进程,那么加参数就相比常规方法复杂了!首先,我想到的时候重写__init__方法,但是先看下父类Process的__init__方法。

process-class-init在父类的__init__中,其实默认已经有一些参数了,比如进程号,都不需要使用os.getpid()就可以获取,如果重写了子类的__init__方法的话,那么这些属性都将不在,因而会出问题。

from multiprocessing import Process
import os

class myProcess(Process):
    def __init__(self,i):
        super().__init__()
        self.i = i

    def run(self):
        print(self.i,os.getpid(),os.getppid())

if __name__ == '__main__':
    print(os.getpid())
    p = myProcess('args')
    p.start()
'''
7224
args 3144 7224
'''

因而可以通过在子类中调用父类的__init__方法,在加入自己的参数即可。不过好像面向对象的方法的确比常规方法要复杂,所以还是推荐使用常规方法启动多进程。

0x02 多线程的主要函数与参数

从上面的源码中可以看出,线程对象中已经有了一些属性了,而有三个属性稍微重要需要记忆!

  • name    #进程的名称
  • pid    #返回进程ID(os.getpid同效)
  • daemon    #设置为守护进程

几个重要的函数:

  • start()    #启动进程
  • terminate()    #停止进程,不是立即生效,而是等待系统回收
  • is_alive()    #判断进程是否还存在
  • join()    #发生阻塞,知道调用join()方法的进程终止

0x03 守护进程

默认情况下,若子进程需要执行很长时间,而父进程只需要很短的时间即可执行完毕,那么父进程执行完毕后即发生阻塞状态,等待子进程执行完毕后才结束整个程序。

而若果有需要,当父进程结束的时候,那么子进程就要立即结束,那么就需要在子进程启动(start())前,将其设置为守护进程。

import time
from multiprocessing import Process

def func(n):
    while True:
        print(n)
        time.sleep(1)

if __name__ == '__main__':
    p = Process(target=func,args=(10,))
    p.daemon = True
    p.start()
    print('main')   #main

子进程是一个无线循环的函数,理论上运行需要无限长的时间,而主进程仅仅是一个打印工作,那么执行上面代码后,仅仅输出个main就结束了(若没有p.daemon = True的话,那么打印一个main后,将一直打印10),其结束的时候也终止了他的守护进程(即子进程)。不过值得注意的是守护进程不允许创建子进程

0x04 关于进程的同步控制Lock\Semaphore\Event

关于Lock

现实中有那么一种情况,买火车票,成百上千人在同事查询票(这种肯定是多线程了,毕竟那么多人),都显示买1张票,如果仅仅按照之前的例子去写代码的话,就会发生一种情况,有几个都显示有1票,都去买了,最后就会有人买到了负数的票,这种是由于访问数据不安全所以引起,因而如果限定再进行买票这个动作的时候,只能有一个进程去发生,那么就不会发生这种情况了。

import time
import json
from multiprocessing import Process
from multiprocessing import Lock

def showTicket(i):
    with open('ticket') as f:
        dic = json.load(f)
    print('%d余票:%s'%(i,dic['ticket']))

def buyTicket(i,lock):
    lock.acquire()  #加锁
    with open('ticket') as f:
        dic = json.load(f)
        time.sleep(0.1)
    if dic['ticket'] > 0:
        dic['ticket'] -= 1
        print('%s买到'%i)
    else:
        print('%s卖完了'%i)

    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)

    lock.release()  #取锁

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=showTicket,args=(i,))
        p.start()

    lock = Lock()
    for i in range(10):
        p = Process(target=buyTicket,args=(i,lock))
        p.start()

通过对买票的动作进行加锁后,同时只能有一个进程去对票数进程操作,而不会在操作完毕后归回锁后,下一个进程才可以继续操作。

关于Semaphore

比如一家KTV,其只有5个包厢,因而只能同时有5个线程去唱歌,别的进程只能等待这5个线程完毕后才能进去。那么Lock只是同时限定了1个线程进行访问,而Semaphore则是Lock的加强版,可以指定同时多少个进行进行操作。

import time,random
from multiprocessing import Process,Semaphore

def sing(i,semaphore):
    semaphore.acquire()
    print('%s进入KTV!!!'%i)
    time.sleep(5)
    print('%s走出KTV!!!'%i)
    semaphore.release()

if __name__ == '__main__':
    semaphore = Semaphore(5)
    for i in range(20):
        p = Process(target=sing,args=(i,semaphore))
        p.start()

Semaphore的操作和Lock的一样,也是通过加锁和取锁来完成对线程数量的控制,执行上面的代码可以看出输出的东西都是一进一出的,并且是5个一起的。

关于Event

Event是用过控制多个进程的执行或者阻塞。比如红绿灯,当绿灯的时候,所有的线程都可以通过,而红灯的时候,所有的线程都得停下。

创建一个Event对象后,默认其值为False,即阻塞状态,但是还需要通过wait()方法才能达到真正的阻塞效果!set()方法是将其值设置为True,clear()方法则清除设置,还原为False。

import time,random
from multiprocessing import Process,Event

def light(event):
    while True:
        if event.is_set():
            print('红灯!!!')
            event.clear()
        else:
            print('绿灯!!!')
            event.set()
        time.sleep(4)

def car(event,i):
    if not event.is_set():
        print('%s车已经停下'%i)
        event.wait()
    print('%s车已经出发'%i)

if __name__ == '__main__':
    event = Event()
    p = Process(target=light,args=(event,))
    p.start()
    for i in range(10):
        p1 = Process(target=car,args=(event,i))
        p1.start()
        time.sleep(random.randint(1,4))

通过上面的代码就可以实现了,对所有进程达到同步控制的效果了!

0x05 关于进程间的通信Queue(JoinableQueue)\Pipe

进程之间的通信主要是通过IPC(Inter-Process Communication)来完成的。

关于Queue

Queue和queue是两个东西哦!Queue是用于创建共享的进程队列,是多进程安全的队列,可以通过使用Queue来实现多进程直接的数据传递,也是较为推荐的一种方法!(其自动已经实现了Lock的效果,因而为数据安全型)

Queue有几个方法

  • put()    #给队列中放值
  • get()    #取出队列中的值
  • empty()    #判断队列是否为空
  • full()    #判断队列是否装满
  • get_nowait()    #即使队列已经空了,还是强行取值,之后抱错EOFError

比较有名的模型为生产者消费者模型。

import time
from multiprocessing import Process,Queue

def consume(name,queue):
    while True:
        food = queue.get()
        print('%s消耗了%s'%(name,food))
        time.sleep(2)

def produce(name,num,thing,queue):
    for i in range(num):
        time.sleep(1)
        food ='%s制造了第%s个%s'%(name,i,thing)
        print(food)
        queue.put(food)



if __name__ == '__main__':
    queue = Queue()
    p = Process(target=produce,args=('xzymoe',10,'包子',queue))
    p.start()
    c1 = Process(target=consume,args=('AAAA',queue))
    c1.start()
    c2 = Process(target=consume,args=('BBBB',queue))
    c2.start()

上面的代码就完成了有一个进程负责产生数据,而另外2个进程负责消耗数据,但是有一个问题。当数据已经生产完毕后,不再有新的数据了,程序在get()的地方就发生了阻塞,并且没有结束程序。

关于JoinableQueue

为了让消费者用完数据后不发生阻塞,那么只能通过JoinableQueue来实现。

import time
from multiprocessing import Process,JoinableQueue

def consume(name,queue):
    while True:
        food = queue.get()
        print('%s消耗了%s'%(name,food))
        time.sleep(2)
        queue.task_done()   #感知到生产者已经生产完毕数据,当数据被用完时,告诉生产者,放行生产者

def produce(name,num,thing,queue):
    for i in range(num):
        time.sleep(1)
        food ='%s制造了第%s个%s'%(name,i,thing)
        print(food)
        queue.put(food)
    queue.join()    #代码到这里就说明生成数据完毕了,此进程发生阻塞,等待消费者用完数据。

if __name__ == '__main__':
    queue = JoinableQueue()
    p1 = Process(target=produce,args=('DDDD',10,'包子',queue))
    p1.start()
    p2 = Process(target=produce,args=('CCCC',10,'包子',queue))
    p2.start()
    c1 = Process(target=consume,args=('AAAA',queue))
    c1.daemon = True
    c1.start()
    c2 = Process(target=consume,args=('BBBB',queue))
    c2.daemon = True
    c2.start()
    #当消费者通过task_done()告诉生产者,数据已经消耗完毕,则生产者进程执行完毕。
    #由于设置了消费者为守护进程,生产者join()后,就执行完毕主进程,从而关闭子进程。
    p1.join()
    p2.join()

代码的注释中,已经解释了,JoinableQueue的工作原理了。值得注意的是,produce执行完毕后,在join()的地方会发生阻塞,消费者感知到数据已经生产完毕后,通过task_done()方法来告诉生产者数据消耗完毕,从而放行produce,而produce才能执行完毕。

在主进程中,通过对p1、p2进程使用join()阻塞,并让消费者c1、c2为守护进程,则可以实现生产者生产完毕数据后,等待消费者消耗完毕数据后,停止进程而不发生阻塞。

关于Pipe

管道相较于队列来说,是数据不安全的,当没有对管道口关闭完全的时候,那么也会出现一些问题,所以不是很推荐使用管道,而是推荐使用队列。

管道的操作类似于套接字socket链接后链接对象的操作,也是只用send()和recv(),不过在传输和接收的过程中不需要使用bytes类型的数据。还是使用生产者消费者模型。

from multiprocessing import Pipe,Process,Lock
import random,time

def produce(com_conn,pro_conn,name,food):
    com_conn.close()
    for i in range(5):
        f = '%s制作了%s个%s'%(name,i,food)
        print(f)
        time.sleep(random.randint(1,3))
        pro_conn.send(f)
    pro_conn.close()

def comsume(com_conn,pro_conn,name,lock):
    pro_conn.close()
    while True:
        try:
            lock.acquire()
            food = com_conn.recv()
            lock.release()
            time.sleep(1)
            print('%s消费了%s' % (name,food))
        except EOFError:
            #一个线程抛出异常后,还没还锁,所以在except中要还锁,不然别的线程会阻塞
            lock.release()
            com_conn.close()
            break


if __name__ == '__main__':
    com,pro = Pipe()
    lock = Lock()
    p = Process(target=produce,args=(com,pro,'AAA','泔水'))
    p.start()
    c1 = Process(target=comsume,args=(com,pro,'BBB',lock))
    c2 = Process(target=comsume,args=(com,pro,'CCC',lock))
    c1.start()
    c2.start()
    com.close()
    pro.close()

从第30行代码看,创建出来一个Pipe对象的时候,将会返回两个对象,类似于套接字accept()后一样。另外在进程中使用了Lock对象,说明Pipe的数据不安全问题。

在管道Pipe上最应该注意的是管道的端点管理的问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

0x06 进程间的数据共享Manager

用途不大,还有点玄学,暂时没研究,暂时忽略!!

0x07 进程池Pool

当工作需要创建很多的进程去完成时候,由于CPU核心数目的限制(os.cpu_count())的限制,另外启动线程与结束线程都需要消耗大量的系统资源,因而推出了进程池的概念。

进程池类似于一个池子中有固定数目的线程(一般采用CPU核心数目+1),比如5个进程,等待这你提交任务,一次执行5个任务,执行完毕的任务退出线程,待执行的任务一次进入进程,从而完成任务。由于进程池中的进程数目是固定的,所以从创建进程池后就不会在增加系统的调度难度了,节省了开闭进程的时间,也一定程度上实现了并发的效果。

进程池主要方法:

  • apply()    #阻塞直到执行结果完成,func仅仅在进程池中的其中一个工作进程中执行。同步效果!返回值为return值!
  • apply_async    #apply()方法的一个变体,返回一个结果对象。是异步非阻塞式的。
  • close()    #阻止任何更多的任务提交到进程池中,一单所有任务完成,工作进程将退出。(即任务执行完毕后,关闭进程池中的进程)
  • join()
  • map()

进程池中的子进程与主进程直接的关系是,主进程执行完毕后会自动结束程序,有点感觉进程池中的程序都是守护线程,因而为了避免进程池中的任务还没执行完毕主进程就结束了,那么所以要用close()与join()。

import time,os
from multiprocessing import Pool

def func(num):
    time.sleep(2)
    print(num,'++++')

if __name__ == '__main__':
    pool = Pool(5)
    for i in range(20):
        pool.apply_async(func,args=(i,))
    pool.close()
    pool.join()
    print(os.getpid())

从执行结果上看,是五组数据五组数据的输出的,待range(20)都完毕后,那么才输出主进程的pid。

如果没有pool.close()和pool.join()的话,那么一般来说只输出了主进程pid程序就结束了,主进程并没有阻塞后等待进程池执行任务。

关于进程池的返回值

子进程中是无法返回一个值给父进程的,所以需要通过IPC(队列,管道)去完成。因而返回到主进程中是进程池特有的。

在apply()中
import time
from multiprocessing import Pool

def func(num):
    time.sleep(1)
    return num

if __name__ == '__main__':
    pool = Pool(5)
    for i in range(20):
        res = pool.apply(func,args=(i,))
        print(res)  #0,1,2,3.....

从打印结果可以看出,是一个一个打印出来的,函数的返回值就是进程池的返回值!

在apply_async()中
import time
from multiprocessing import Pool

def func(num):
    time.sleep(1)
    return num

if __name__ == '__main__':
    pool = Pool(5)
    for i in range(20):
        res = pool.apply_async(func,args=(i,))
        print(res)  #<multiprocessing.pool.ApplyResult object at 0x0000023EA23A0710>
        print(res.get())  #0,1,2,3.....
    pool.close()
    pool.join()

从res的打印结果可以看出在apply_async()返回的结果是一个对象,可以通过get()方法取值。虽然是异步提交的,但是打印结果也是顺序的一个一个的打印的,因为在get()的时候发生了阻塞。

关于回调函数callback

回调函数是,进程池中进程的返回值,成为回调函数的参数传入回调函数中。首先看看下,进程池中的进程、回调函数进程和主进程之间的关系。

import os
from multiprocessing import Pool

def func1(num):
    print('func1',num,os.getpid())
    return num*num

def func2(num):
    print('func2',num,os.getpid())

if __name__ == '__main__':
    pool = Pool(2)
    for i in range(3):
        pool.apply_async(func1,args=(i,),callback=func2)
    pool.close()
    pool.join()
    print('main',os.getpid())
'''
func1 0 8712
func1 1 8712
func2 0 15200
func1 2 8712
func2 1 15200
func2 4 15200
main 15200
'''

通过进程的pid可以看出,进程池中的进程是子进程,而回调函数的pid与主进程的pid一致,说明回调函数是在主进程里运行的。为什么要先看这个呢?因为看了这个你才能名称回调函数设计的意义。

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数。

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

所以回调函数经常可以用在写爬虫中,因为抓取数据由于网络延时的问题,所以比较费时间,而抓到数据后,进行数据筛选就很快了,这样丢给主进程,就可以让需要花费更多的时间的爬取任务丢给进程池来做。

本来想抓取猫眼电影里的东西,可能是正则写错次数太多了,被爬的,所以被禁止访问了,之后就去爬豆瓣电影了,写完了代码后准备加翻页功能的时候,发现只有一页。不过思路大概是这样的就行了!!!

import re
import requests
from multiprocessing import Pool

def get(url,pattern):
    response = requests.get(url)
    res = response.content.decode('utf-8')
    res = pattern.findall(res)
    return res

def show(res):
    for i in res:
        print('名称:%s,日期:%s,演员:%s'%(i[0],i[1],i[2]))

if __name__ == '__main__':
    regex = r'

<table width="100%" class="">.*?title="(.*?)">.*?

(.*?)/(.*?)

.*?</table>


'
    pattern = re.compile(regex,re.S)
    url = 'https://movie.douban.com/chart'
    pool = Pool(5)
    pool.apply_async(get,args=(url,pattern),callback=show)
    pool.close()
    pool.join()
    print('Finished!!!')

将爬取后的页面的结果,通过回调函数返回给主进程,如果还有第二第三页。。。的时候,那么get()方法(突然发现起了个不太好的名字!!!因为apply_async()方法的返回值取值也是通过get()方法的,不过这个例子不影响。)可以进行新的页面爬取,而主进程可以很快的就完成排版show()功能。运行结果如下。

呀!感觉正则写的还是糟糕!!!要用到requests模块,自行pip3 install requests即可!

发表评论