multiprocessing模块是Python基于进程的并发相关的模块,和threading模块类似,还提供了threading模块中没有的API,比如Pool对象,他可以很方便的对多个函数或者分布式数据并行执行和运算。
0x00 多进程的特点
多进程执行特点为:他们本质上就是异步的,需要多个并发事务,各个事务的运行顺序可以是不确定的、随机的、不可预测的。特别适合运行密集型任务。
从图中可以看出,先启动的进程,只是先进入了就绪状态,并未进行运行,所以多进程的运行顺序是不确定的。而多进程分配时间片刻的时候,也是有操作系统调度完成的,因而其有随机性与不可预测性。
0x01 启用多进程的两种方式
在Windows系统中,父进程一定要在if __name__ == ‘__main__’中启动,因为在Windows系统中,子进程会默认import父进程的所有东西,如果没有这个条件 ,那么将会造成无限Recursion的局面,当然就是报错了!
a.常规方法
import os from multiprocessing import Process def func(i): print(i,os.getpid(),os.getppid()) if __name__ == '__main__': p = Process(target=func,args=('test',)) p.start() print(os.getpid()) ''' 200 test 2800 200 '''
创建一个Process的对象,通过start()来创建新的进程。在创建Process对象的时候,需要设置的一个目标函数与参数。target后设置目标函数即可。而arges则放入相应的参数,如果没有参数则可以省略该部分,若只有一个参数的时候,需要加一个逗号,因为args传入的必须是一个元祖对象。
b.面向对象方法
from multiprocessing import Process import os class myProcess(Process): def run(self): print(os.getpid(),os.getppid()) if __name__ == '__main__': print(os.getpid()) p = myProcess() p.start() ''' 1376 3916 1376 '''
通过实现一个类其继承Process类,之后重新run()方法即可,启动进程的时候也是使用start()函数,start()函数启动后默认调用rum()方法,从而启动进程。
使用面向对象的方法去启开多进程,那么加参数就相比常规方法复杂了!首先,我想到的时候重写__init__方法,但是先看下父类Process的__init__方法。
在父类的__init__中,其实默认已经有一些参数了,比如进程号,都不需要使用os.getpid()就可以获取,如果重写了子类的__init__方法的话,那么这些属性都将不在,因而会出问题。
from multiprocessing import Process import os class myProcess(Process): def __init__(self,i): super().__init__() self.i = i def run(self): print(self.i,os.getpid(),os.getppid()) if __name__ == '__main__': print(os.getpid()) p = myProcess('args') p.start() ''' 7224 args 3144 7224 '''
因而可以通过在子类中调用父类的__init__方法,在加入自己的参数即可。不过好像面向对象的方法的确比常规方法要复杂,所以还是推荐使用常规方法启动多进程。
0x02 多线程的主要函数与参数
从上面的源码中可以看出,线程对象中已经有了一些属性了,而有三个属性稍微重要需要记忆!
- name #进程的名称
- pid #返回进程ID(os.getpid同效)
- daemon #设置为守护进程
几个重要的函数:
- start() #启动进程
- terminate() #停止进程,不是立即生效,而是等待系统回收
- is_alive() #判断进程是否还存在
- join() #发生阻塞,知道调用join()方法的进程终止
0x03 守护进程
默认情况下,若子进程需要执行很长时间,而父进程只需要很短的时间即可执行完毕,那么父进程执行完毕后即发生阻塞状态,等待子进程执行完毕后才结束整个程序。
而若果有需要,当父进程结束的时候,那么子进程就要立即结束,那么就需要在子进程启动(start())前,将其设置为守护进程。
import time from multiprocessing import Process def func(n): while True: print(n) time.sleep(1) if __name__ == '__main__': p = Process(target=func,args=(10,)) p.daemon = True p.start() print('main') #main
子进程是一个无线循环的函数,理论上运行需要无限长的时间,而主进程仅仅是一个打印工作,那么执行上面代码后,仅仅输出个main就结束了(若没有p.daemon = True的话,那么打印一个main后,将一直打印10),其结束的时候也终止了他的守护进程(即子进程)。不过值得注意的是守护进程不允许创建子进程。
0x04 关于进程的同步控制Lock\Semaphore\Event
关于Lock
现实中有那么一种情况,买火车票,成百上千人在同事查询票(这种肯定是多线程了,毕竟那么多人),都显示买1张票,如果仅仅按照之前的例子去写代码的话,就会发生一种情况,有几个都显示有1票,都去买了,最后就会有人买到了负数的票,这种是由于访问数据不安全所以引起,因而如果限定再进行买票这个动作的时候,只能有一个进程去发生,那么就不会发生这种情况了。
import time import json from multiprocessing import Process from multiprocessing import Lock def showTicket(i): with open('ticket') as f: dic = json.load(f) print('%d余票:%s'%(i,dic['ticket'])) def buyTicket(i,lock): lock.acquire() #加锁 with open('ticket') as f: dic = json.load(f) time.sleep(0.1) if dic['ticket'] > 0: dic['ticket'] -= 1 print('%s买到'%i) else: print('%s卖完了'%i) time.sleep(0.1) with open('ticket','w') as f: json.dump(dic,f) lock.release() #取锁 if __name__ == '__main__': for i in range(10): p = Process(target=showTicket,args=(i,)) p.start() lock = Lock() for i in range(10): p = Process(target=buyTicket,args=(i,lock)) p.start()
通过对买票的动作进行加锁后,同时只能有一个进程去对票数进程操作,而不会在操作完毕后归回锁后,下一个进程才可以继续操作。
关于Semaphore
比如一家KTV,其只有5个包厢,因而只能同时有5个线程去唱歌,别的进程只能等待这5个线程完毕后才能进去。那么Lock只是同时限定了1个线程进行访问,而Semaphore则是Lock的加强版,可以指定同时多少个进行进行操作。
import time,random from multiprocessing import Process,Semaphore def sing(i,semaphore): semaphore.acquire() print('%s进入KTV!!!'%i) time.sleep(5) print('%s走出KTV!!!'%i) semaphore.release() if __name__ == '__main__': semaphore = Semaphore(5) for i in range(20): p = Process(target=sing,args=(i,semaphore)) p.start()
Semaphore的操作和Lock的一样,也是通过加锁和取锁来完成对线程数量的控制,执行上面的代码可以看出输出的东西都是一进一出的,并且是5个一起的。
关于Event
Event是用过控制多个进程的执行或者阻塞。比如红绿灯,当绿灯的时候,所有的线程都可以通过,而红灯的时候,所有的线程都得停下。
创建一个Event对象后,默认其值为False,即阻塞状态,但是还需要通过wait()方法才能达到真正的阻塞效果!set()方法是将其值设置为True,clear()方法则清除设置,还原为False。
import time,random from multiprocessing import Process,Event def light(event): while True: if event.is_set(): print('红灯!!!') event.clear() else: print('绿灯!!!') event.set() time.sleep(4) def car(event,i): if not event.is_set(): print('%s车已经停下'%i) event.wait() print('%s车已经出发'%i) if __name__ == '__main__': event = Event() p = Process(target=light,args=(event,)) p.start() for i in range(10): p1 = Process(target=car,args=(event,i)) p1.start() time.sleep(random.randint(1,4))
通过上面的代码就可以实现了,对所有进程达到同步控制的效果了!
0x05 关于进程间的通信Queue(JoinableQueue)\Pipe
进程之间的通信主要是通过IPC(Inter-Process Communication)来完成的。
关于Queue
Queue和queue是两个东西哦!Queue是用于创建共享的进程队列,是多进程安全的队列,可以通过使用Queue来实现多进程直接的数据传递,也是较为推荐的一种方法!(其自动已经实现了Lock的效果,因而为数据安全型)
Queue有几个方法
- put() #给队列中放值
- get() #取出队列中的值
- empty() #判断队列是否为空
- full() #判断队列是否装满
- get_nowait() #即使队列已经空了,还是强行取值,之后抱错EOFError
比较有名的模型为生产者消费者模型。
import time from multiprocessing import Process,Queue def consume(name,queue): while True: food = queue.get() print('%s消耗了%s'%(name,food)) time.sleep(2) def produce(name,num,thing,queue): for i in range(num): time.sleep(1) food ='%s制造了第%s个%s'%(name,i,thing) print(food) queue.put(food) if __name__ == '__main__': queue = Queue() p = Process(target=produce,args=('xzymoe',10,'包子',queue)) p.start() c1 = Process(target=consume,args=('AAAA',queue)) c1.start() c2 = Process(target=consume,args=('BBBB',queue)) c2.start()
上面的代码就完成了有一个进程负责产生数据,而另外2个进程负责消耗数据,但是有一个问题。当数据已经生产完毕后,不再有新的数据了,程序在get()的地方就发生了阻塞,并且没有结束程序。
关于JoinableQueue
为了让消费者用完数据后不发生阻塞,那么只能通过JoinableQueue来实现。
import time from multiprocessing import Process,JoinableQueue def consume(name,queue): while True: food = queue.get() print('%s消耗了%s'%(name,food)) time.sleep(2) queue.task_done() #感知到生产者已经生产完毕数据,当数据被用完时,告诉生产者,放行生产者 def produce(name,num,thing,queue): for i in range(num): time.sleep(1) food ='%s制造了第%s个%s'%(name,i,thing) print(food) queue.put(food) queue.join() #代码到这里就说明生成数据完毕了,此进程发生阻塞,等待消费者用完数据。 if __name__ == '__main__': queue = JoinableQueue() p1 = Process(target=produce,args=('DDDD',10,'包子',queue)) p1.start() p2 = Process(target=produce,args=('CCCC',10,'包子',queue)) p2.start() c1 = Process(target=consume,args=('AAAA',queue)) c1.daemon = True c1.start() c2 = Process(target=consume,args=('BBBB',queue)) c2.daemon = True c2.start() #当消费者通过task_done()告诉生产者,数据已经消耗完毕,则生产者进程执行完毕。 #由于设置了消费者为守护进程,生产者join()后,就执行完毕主进程,从而关闭子进程。 p1.join() p2.join()
代码的注释中,已经解释了,JoinableQueue的工作原理了。值得注意的是,produce执行完毕后,在join()的地方会发生阻塞,消费者感知到数据已经生产完毕后,通过task_done()方法来告诉生产者数据消耗完毕,从而放行produce,而produce才能执行完毕。
在主进程中,通过对p1、p2进程使用join()阻塞,并让消费者c1、c2为守护进程,则可以实现生产者生产完毕数据后,等待消费者消耗完毕数据后,停止进程而不发生阻塞。
关于Pipe
管道相较于队列来说,是数据不安全的,当没有对管道口关闭完全的时候,那么也会出现一些问题,所以不是很推荐使用管道,而是推荐使用队列。
管道的操作类似于套接字socket链接后链接对象的操作,也是只用send()和recv(),不过在传输和接收的过程中不需要使用bytes类型的数据。还是使用生产者消费者模型。
from multiprocessing import Pipe,Process,Lock import random,time def produce(com_conn,pro_conn,name,food): com_conn.close() for i in range(5): f = '%s制作了%s个%s'%(name,i,food) print(f) time.sleep(random.randint(1,3)) pro_conn.send(f) pro_conn.close() def comsume(com_conn,pro_conn,name,lock): pro_conn.close() while True: try: lock.acquire() food = com_conn.recv() lock.release() time.sleep(1) print('%s消费了%s' % (name,food)) except EOFError: #一个线程抛出异常后,还没还锁,所以在except中要还锁,不然别的线程会阻塞 lock.release() com_conn.close() break if __name__ == '__main__': com,pro = Pipe() lock = Lock() p = Process(target=produce,args=(com,pro,'AAA','泔水')) p.start() c1 = Process(target=comsume,args=(com,pro,'BBB',lock)) c2 = Process(target=comsume,args=(com,pro,'CCC',lock)) c1.start() c2.start() com.close() pro.close()
从第30行代码看,创建出来一个Pipe对象的时候,将会返回两个对象,类似于套接字accept()后一样。另外在进程中使用了Lock对象,说明Pipe的数据不安全问题。
在管道Pipe上最应该注意的是管道的端点管理的问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
0x06 进程间的数据共享Manager
用途不大,还有点玄学,暂时没研究,暂时忽略!!
0x07 进程池Pool
当工作需要创建很多的进程去完成时候,由于CPU核心数目的限制(os.cpu_count())的限制,另外启动线程与结束线程都需要消耗大量的系统资源,因而推出了进程池的概念。
进程池类似于一个池子中有固定数目的线程(一般采用CPU核心数目+1),比如5个进程,等待这你提交任务,一次执行5个任务,执行完毕的任务退出线程,待执行的任务一次进入进程,从而完成任务。由于进程池中的进程数目是固定的,所以从创建进程池后就不会在增加系统的调度难度了,节省了开闭进程的时间,也一定程度上实现了并发的效果。
进程池主要方法:
- apply() #阻塞直到执行结果完成,func仅仅在进程池中的其中一个工作进程中执行。同步效果!返回值为return值!
- apply_async #apply()方法的一个变体,返回一个结果对象。是异步非阻塞式的。
- close() #阻止任何更多的任务提交到进程池中,一单所有任务完成,工作进程将退出。(即任务执行完毕后,关闭进程池中的进程)
- join()
- map()
进程池中的子进程与主进程直接的关系是,主进程执行完毕后会自动结束程序,有点感觉进程池中的程序都是守护线程,因而为了避免进程池中的任务还没执行完毕主进程就结束了,那么所以要用close()与join()。
import time,os from multiprocessing import Pool def func(num): time.sleep(2) print(num,'++++') if __name__ == '__main__': pool = Pool(5) for i in range(20): pool.apply_async(func,args=(i,)) pool.close() pool.join() print(os.getpid())
从执行结果上看,是五组数据五组数据的输出的,待range(20)都完毕后,那么才输出主进程的pid。
如果没有pool.close()和pool.join()的话,那么一般来说只输出了主进程pid程序就结束了,主进程并没有阻塞后等待进程池执行任务。
关于进程池的返回值
子进程中是无法返回一个值给父进程的,所以需要通过IPC(队列,管道)去完成。因而返回到主进程中是进程池特有的。
在apply()中
import time from multiprocessing import Pool def func(num): time.sleep(1) return num if __name__ == '__main__': pool = Pool(5) for i in range(20): res = pool.apply(func,args=(i,)) print(res) #0,1,2,3.....
从打印结果可以看出,是一个一个打印出来的,函数的返回值就是进程池的返回值!
在apply_async()中
import time from multiprocessing import Pool def func(num): time.sleep(1) return num if __name__ == '__main__': pool = Pool(5) for i in range(20): res = pool.apply_async(func,args=(i,)) print(res) #<multiprocessing.pool.ApplyResult object at 0x0000023EA23A0710> print(res.get()) #0,1,2,3..... pool.close() pool.join()
从res的打印结果可以看出在apply_async()返回的结果是一个对象,可以通过get()方法取值。虽然是异步提交的,但是打印结果也是顺序的一个一个的打印的,因为在get()的时候发生了阻塞。
关于回调函数callback
回调函数是,进程池中进程的返回值,成为回调函数的参数传入回调函数中。首先看看下,进程池中的进程、回调函数进程和主进程之间的关系。
import os from multiprocessing import Pool def func1(num): print('func1',num,os.getpid()) return num*num def func2(num): print('func2',num,os.getpid()) if __name__ == '__main__': pool = Pool(2) for i in range(3): pool.apply_async(func1,args=(i,),callback=func2) pool.close() pool.join() print('main',os.getpid()) ''' func1 0 8712 func1 1 8712 func2 0 15200 func1 2 8712 func2 1 15200 func2 4 15200 main 15200 '''
通过进程的pid可以看出,进程池中的进程是子进程,而回调函数的pid与主进程的pid一致,说明回调函数是在主进程里运行的。为什么要先看这个呢?因为看了这个你才能名称回调函数设计的意义。
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数。
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
所以回调函数经常可以用在写爬虫中,因为抓取数据由于网络延时的问题,所以比较费时间,而抓到数据后,进行数据筛选就很快了,这样丢给主进程,就可以让需要花费更多的时间的爬取任务丢给进程池来做。
本来想抓取猫眼电影里的东西,可能是正则写错次数太多了,被爬的,所以被禁止访问了,之后就去爬豆瓣电影了,写完了代码后准备加翻页功能的时候,发现只有一页。不过思路大概是这样的就行了!!!
import re import requests from multiprocessing import Pool def get(url,pattern): response = requests.get(url) res = response.content.decode('utf-8') res = pattern.findall(res) return res def show(res): for i in res: print('名称:%s,日期:%s,演员:%s'%(i[0],i[1],i[2])) if __name__ == '__main__': regex = r' <table width="100%" class="">.*?title="(.*?)">.*? (.*?)/(.*?) .*?</table> ' pattern = re.compile(regex,re.S) url = 'https://movie.douban.com/chart' pool = Pool(5) pool.apply_async(get,args=(url,pattern),callback=show) pool.close() pool.join() print('Finished!!!')
将爬取后的页面的结果,通过回调函数返回给主进程,如果还有第二第三页。。。的时候,那么get()方法(突然发现起了个不太好的名字!!!因为apply_async()方法的返回值取值也是通过get()方法的,不过这个例子不影响。)可以进行新的页面爬取,而主进程可以很快的就完成排版show()功能。运行结果如下。