九. 并发编程 (进程池)
一 .进程池(multiprocess.Pool)
1.进程池概念
为什么要有进程池?进程池的概念。在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先
,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据
任务开启或者结束进程。那么我们要怎么做呢?在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将
进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,
池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。 Pool([numprocess [,initializer [, initargs]]]):创建进程池1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值 2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None 3 initargs:是要传给initializer的参数组
import os
import time
import random
from multiprocessing import Pool
多进程和进程
from multiprocessing import Pool,Proces def run(n):for i in range(10):print(n+1)if __name__ == '__main__':
进程池p=Pool(5) # 参数表示5个进程 (类似于5个cppu)p.map(run, range(50)) # 50任务# 等同于上面只是时间上的差异 进程池更快 下面这个要慢带你 多进程for i in range(50):Process(target=run,args=(i,)).start()
进程池和多进程效率对比
注意:map : 这个map在这里自带了join方法 不需要人为添加
def run(n):for i in range(10):print(n+1)
if __name__ == '__main__':
进程池str1=time.time()p=Pool(5) # 参数表示5个进程 (类似于5个cppu)p.map(run, range(50)) # 50任务 这个map在这里自带了join方法 不需要人为添加t1=time.time()-str1 # 0.26728272438049316 进程池执行时间 多进程str2 = time.time()ret=[]for i in range(50):p1=Process(target=run,args=(i,))ret.append(p1)p1.start()for i in ret:i.join()t2=time.time()-str2 # 2.7745769023895264 没有使用进程池执行时间print(t1,t2)
map进程池传参数 # map : 这个map在这里自带了join方法 不需要人为添加 def run(n):for i in range(10):print(n+1)def run1(n):print(n) # ('aa', 100)print(n[1]) # 100if __name__ == '__main__':p=Pool(5) # 参数表示5个进程 (类似于5个cppu)p.map(run, range(5)) # 50任务 这个map在这里自带了join方法 不需要人为添加 p.map(run1,[("aa",100)])
2.进程同步(apply)
# 同步 import os,time from multiprocessing import Pooldef work(n):print("开始 run%s"%n,os.getpid())time.sleep(1)print("结束 run%s" % n, os.getpid())if __name__ == '__main__':p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务res_l=[]for i in range(10):res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞# 但不管该任务是否存在阻塞,同步调用都会在原地等着print(res_l, "空") # # 执行结果: # 开始 run0 14036 # 结束 run0 14036 # 开始 run1 18312 # 结束 run1 18312 # 开始 run2 12744 # 结束 run2 12744 # 开始 run3 14036 # 结束 run3 14036 # 开始 run4 18312 # 结束 run4 18312 # 开始 run5 12744 # 结束 run5 12744 # 开始 run6 14036 # 结束 run6 14036 # 开始 run7 18312 # 结束 run7 18312 # 开始 run8 12744 # 结束 run8 12744 # 开始 run9 14036 # 结束 run9 14036 # [] 空 # 进程已结束,退出代码 0
3.进程异步( apply_async)
# 异步 带问题的 import os import time import random from multiprocessing import Pool def work(n):print("开始 run%s" % n, os.getpid())time.sleep(1)print("结束 run%s" % n, os.getpid()) if __name__ == '__main__':print("主进程")p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务for i in range(10):res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行# 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务# 注意这里是一个真异步 就是主进程不会等子进程结束 而是主进程执行完了不会管子进程 因为没有发感知进程池结束 # 执行结果 # 主进程 # 进程已结束,退出代码 0
# 异步 import os import time import random from multiprocessing import Pool def work(n):print("开始 run%s" % n, os.getpid())time.sleep(1)print("结束 run%s" % n, os.getpid()) if __name__ == '__main__':p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务for i in range(10):res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行# 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务p.close() # 结束进程池接收任务p.join() # 感知进程池中的任务结束# 执行结果 # 开始 run0 12284 # 开始 run1 4864 # 开始 run2 18452 # 结束 run0 12284 # 开始 run3 12284 # 结束 run1 4864 # 开始 run4 4864 # 结束 run2 18452 # 开始 run5 18452 # 结束 run3 12284 # 开始 run6 12284 # 结束 run4 4864 # 开始 run7 4864 # 结束 run5 18452 # 开始 run8 18452 # 结束 run6 12284 # 开始 run9 12284 # 结束 run7 4864 # 结束 run8 18452 # 结束 run9 12284 # 进程已结束,退出代码 0
import os import time import random from multiprocessing import Pooldef work(n):print("开始 run%s" % n, os.getpid())time.sleep(1)print("结束 run%s" % n, os.getpid()) if __name__ == '__main__':print("主进程")p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务res_l=[]for i in range(5):res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行# 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务# 需要注意的是,进程池中的三个进程不会同时开启或者同时结束# 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 res_l.append(res)# 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果# 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了 p.close()p.join()# print(res_l,"》》》》》》》》》》")for res in res_l:print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get# 执行结果 主进程开始 run0 14684开始 run1 15180开始 run2 18996结束 run0 14684开始 run3 14684结束 run1 15180开始 run4 15180结束 run2 18996结束 run3 14684结束 run4 15180NoneNoneNoneNoneNone进程已结束,退出代码 0
from multiprocessing import Pool import time,random,osdef work(n):time.sleep(0.5)return n**2 if __name__ == '__main__':p=Pool()res_l=[]for i in range(10):res=p.apply_async(work,args=(i,))res_l.append(res)p.close()p.join() #等待进程池中所有进程执行完毕for aa in res_l:print(aa.get()) # 获取进程池中所有数据# 0 # 1 # 4 # 9 # 16 # 25 # 36 # 49 # 64 # 81
# 如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数。 from multiprocessing import Pool import time,random,osdef work(n):time.sleep(1)return n**2 if __name__ == '__main__':p=Pool()res_l=[]for i in range(10):res=p.apply_async(work,args=(i,))res_l.append(res)p.close()p.join() #等待进程池中所有进程执行完毕 nums=[]for res in res_l:nums.append(res.get()) #拿到所有结果print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4.进程池版 socket
server
#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count()) #开启6个客户端,会发现2个客户端处于等待状态 #在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程 from socket import * from multiprocessing import Pool import osserver=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5)def talk(conn):print('进程pid: %s' %os.getpid())while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p=Pool(4)while True:conn,*_=server.accept()p.apply_async(talk,args=(conn,))# p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
client
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))
5. 进程池回调函数(主要用于爬虫)
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:
我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数 # 先执行fun1函数 fun1函数的返回值 作为回调函数的参数 然后去执行回调函数
import os import time import random from multiprocessing import Pool# 先执行fun1函数 fun1函数的返回值 作为回调函数的参数 然后去执行回调函数 def fun1(n):print("这是fun1函数",os.getpid())return n*ndef fun2(aa): # 参数只能回调哪个参数print("这是fun2函数",os.getpid())print(aa)if __name__ == '__main__':print("主进程",os.getpid())p=Pool(5)p.apply_async(fun1,args=(10,),callback=fun2)p.close()p.join()# 说明fun2回调函数 回到 主进程中调用 执行结果主进程 19268这是fun1函数 12740这是fun2函数 19268100进程已结束,退出代码 0
import os import time import random from multiprocessing import Pool# 先执行fun1函数 fun1函数的返回值 作为回调函数的参数 然后去执行回调函数 def fun1(n):print("这是fun1函数",os.getpid())return n*ndef fun2(aa): # 参数只能回调哪个参数print("这是fun2函数",os.getpid())print(aa)if __name__ == '__main__':print("主进程",os.getpid())p=Pool(5)p.apply_async(fun1,args=(5,),callback=fun2)p.close()p.join()# 说明fun2回调函数 回到 主进程中调用 执行结果主进程 7164这是fun1函数 3272这是fun2函数 716425进程已结束,退出代码 0
爬虫案例
import requests from multiprocessing import Poolaa=requests.get("https://www.baidu.com") print(aa) print(aa.status_code) print(aa.text) print("11111",aa.content)
print(aa.__dict__) 查看里面属性
print("**************************************")
执行结果
200
ç¾åº¦ä¸ä¸ï¼ä½ å°±ç¥é ç»å½');
æ´å¤äº§å©2017 Baidu 使ç¨ç¾åº¦åå¿è¯» æè§åé¦ äº¬ICPè¯030173å·
![]()
11111 b'\r\n b' http-equiv=X-UA-Compatible content=IE=Edge> b'href=https://ss1.bdstatic.com/5eN1bjq8AAUYm2zgoY3K/r/www/cache/bdorz/baidu.min.css>\xe7\x99\xbe\xe5\xba\xa6\ ' \
xe4\xb8\x80\xe4\xb8\x8b\xef\xbc\x8c\xe4\xbd\xa0\xe5\xb0\xb1\xe7\x9f\xa5\xe9\x81\x93
b'b'hidefocus=true src=//www.baidu.com/img/bd_logo1.png width=270 height=129> b'u.com/s class="fm"> b'n name=f value=8> b'ame=tn value=baidu> b'focus=autofocus> b'4\xb8\x8b class="bg s_btn" autofocus> b'ews class="mnav">\xe6\x96\xb0\xe9\x97\xbb hao123 b'f=http://map.baidu.com name=tj_trmap class="mnav">\xe5\x9c\xb0\xe5\x9b\xbe
\xe9\xa6\x88 \xe4\xba\xacICP\xe8\xaf\x81030173\xe5\x8f\xb7 ' \
b'\r\n'
def get_ali(url):# print(url)# res=requests.get(url)# print(res)# print(res.status_code)# print(res.text)res=requests.get(url)if res.status_code==200:return res.content.decode("utf-8"),url,res.textdef show(args):cont,url,text=argsprint(cont,text)print(url,len(cont))if __name__=="__main__":ret=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=Pool(5)for url in ret:p.apply_async(get_ali,args=(url,),callback=show)p.close()p.join()
使用多进程请求多个url来减少网络等待浪费的时间
from multiprocessing import Pool import requests import json import osdef get_page(url):print('<进程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def pasrse_page(res):print('<进程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=Pool(3)res_l=[]for url in urls:res=p.apply_async(get_page,args=(url,),callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了''' 打印结果: <进程3388> get https://www.baidu.com <进程3389> get https://www.python.org <进程3390> get https://www.openstack.org <进程3388> get https://help.github.com/ <进程3387> parse https://www.baidu.com <进程3389> get http://www.sina.com.cn/ <进程3387> parse https://www.python.org <进程3387> parse https://help.github.com/ <进程3387> parse http://www.sina.com.cn/ <进程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '\r\n...',...}] '''
import re from urllib.request import urlopen from multiprocessing import Pooldef get_page(url,pattern):response=urlopen(url).read().decode('utf-8')return pattern,responsedef parse_page(info):pattern,page_content=infores=re.findall(pattern,page_content)for item in res:dic={'index':item[0].strip(),'title':item[1].strip(),'actor':item[2].strip(),'time':item[3].strip(),}print(dic) if __name__ == '__main__':regex = r'.*?<.*?class="board-index.*?>(\d+).*?title="(.*?)".*?class="movie-item-info".*? 'pattern1=re.compile(regex,re.S)url_dic={'http://maoyan.com/board/7':pattern1,}p=Pool()res_l=[]for url,pattern in url_dic.items():res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)res_l.append(res)for i in res_l:i.get(){'index': '1', 'title': '绝杀慕尼黑', 'actor': '主演:弗拉基米尔·马什科夫,约翰·萨维奇,伊万·科列斯尼科夫', 'time': '上映时间:2019-06-13'} {'index': '2', 'title': '千与千寻', 'actor': '主演:柊瑠美,周冬雨,入野自由', 'time': '上映时间:2019-06-21'} {'index': '3', 'title': '命运之夜——天之杯II :迷失之蝶', 'actor': '主演:杉山纪彰,下屋则子,神谷浩史', 'time': '上映时间:2019-07-12'} {'index': '4', 'title': '玩具总动员4', 'actor': '主演:汤姆·汉克斯,蒂姆·艾伦,安妮·波茨', 'time': '上映时间:2019-06-21'} {'index': '5', 'title': '扫毒2天地对决', 'actor': '主演:刘德华,古天乐,苗侨伟', 'time': '上映时间:2019-07-05'} {'index': '6', 'title': '蜘蛛侠:英雄远征', 'actor': '主演:汤姆·赫兰德,杰克·吉伦哈尔,塞缪尔·杰克逊', 'time': '上映时间:2019-06-28'} {'index': '7', 'title': '爱宠大机密2', 'actor': '主演:帕顿·奥斯瓦尔特,冯绍峰,凯文·哈特', 'time': '上映时间:2019-07-05'} {'index': '8', 'title': '狮子王', 'actor': '主演:唐纳德·格洛弗,塞斯·罗根,詹姆斯·厄尔·琼斯', 'time': '上映时间:2019-07-12'} {'index': '9', 'title': '机动战士高达NT', 'actor': '主演:榎木淳弥,村中知,松浦爱弓', 'time': '上映时间:2019-07-12'} {'index': '10', 'title': '最好的我们', 'actor': '主演:陈飞宇,何蓝逗,惠英红', 'time': '上映时间:2019-06-06'}(.*?)
.*?(.*?)
6. 进程池返回值
异步获取返回值
def fun (i):# time.sleep(1)return i*i if __name__ == '__main__':p=Pool(5)for i in range(6):res=p.apply_async(fun,args=(i,)) # # print(res)print(res.get()) #等待计算结果 阻塞获取结果 等待下次循环 这里相当于join# print(res.get())获取到值但是和同步一样 但是我们这里是异步 所以异步get阻塞
解决方案
import os import time import random from multiprocessing import Pool # 异步获取返回值 表示五个五个返回数据 def fun (i):time.sleep(1)return i*i if __name__ == '__main__':p=Pool(5)ret=[]for i in range(22):res=p.apply_async(fun,args=(i,)) # ret.append(res) #等待计算结果 阻塞获取结果 等待下次循环 这里相当于joinfor s in ret:print( s.get()) # 异步获取返回值 表示五个五个返回数据
异步获取返回值 map
# map 自带join方法 close 方法 map 会先把每一个计算结果 添加到列表中 在返回 列表 def fun (i):time.sleep(1)return i*i if __name__ == '__main__':p=Pool(5)res=p.map(fun,range(5)) # print(res) # [0, 1, 4, 9, 16]
同步获取返回值 def fun (i):return i*i if __name__ == '__main__':p=Pool(5)for i in range(5):res=p.apply(fun,args=(i,)) # apply结果就是fun的返回值print(res)# 0 # 1 # 4 # 9 # 16
转载于:https://www.cnblogs.com/Sup-to/p/11195123.html
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
