九. 并发编程 (进程池)

一 .进程池(multiprocess.Pool)

1.进程池概念

为什么要有进程池?进程池的概念。在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先
,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据
任务开启或者结束进程。那么我们要怎么做呢?在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将
进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,
池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
Pool([numprocess  [,initializer [, initargs]]]):创建进程池1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组

import os
import time
import random
from multiprocessing import Pool
多进程和进程
from
multiprocessing import Pool,Proces def run(n):for i in range(10):print(n+1)if __name__ == '__main__':
进程池p
=Pool(5) # 参数表示5个进程 (类似于5个cppu)p.map(run, range(50)) # 50任务# 等同于上面只是时间上的差异 进程池更快 下面这个要慢带你 多进程for i in range(50):Process(target=run,args=(i,)).start()
 
进程池和多进程效率对比
 注意:map : 这个map在这里自带了join方法 不需要人为添加
def run(n):for i in  range(10):print(n+1)
if __name__ == '__main__':

进程池str1
=time.time()p=Pool(5) # 参数表示5个进程 (类似于5个cppu)p.map(run, range(50)) # 50任务 这个map在这里自带了join方法 不需要人为添加t1=time.time()-str1 # 0.26728272438049316 进程池执行时间 多进程str2 = time.time()ret=[]for i in range(50):p1=Process(target=run,args=(i,))ret.append(p1)p1.start()for i in ret:i.join()t2=time.time()-str2 # 2.7745769023895264 没有使用进程池执行时间print(t1,t2)



 
map进程池传参数
# map : 这个map在这里自带了join方法 不需要人为添加
def run(n):for i in  range(10):print(n+1)def run1(n):print(n)     # ('aa', 100)print(n[1])  # 100if __name__ == '__main__':p=Pool(5)       # 参数表示5个进程 (类似于5个cppu)p.map(run, range(5))  # 50任务          这个map在这里自带了join方法   不需要人为添加
p.map(run1,[("aa",100)])

2.进程同步(apply)

# 同步
import os,time
from multiprocessing import Pooldef work(n):print("开始 run%s"%n,os.getpid())time.sleep(1)print("结束 run%s" % n, os.getpid())if __name__ == '__main__':p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务res_l=[]for i in range(10):res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞# 但不管该任务是否存在阻塞,同步调用都会在原地等着print(res_l, "")
#
# 执行结果:
# 开始 run0 14036
# 结束 run0 14036
# 开始 run1 18312
# 结束 run1 18312
# 开始 run2 12744
# 结束 run2 12744
# 开始 run3 14036
# 结束 run3 14036
# 开始 run4 18312
# 结束 run4 18312
# 开始 run5 12744
# 结束 run5 12744
# 开始 run6 14036
# 结束 run6 14036
# 开始 run7 18312
# 结束 run7 18312
# 开始 run8 12744
# 结束 run8 12744
# 开始 run9 14036
# 结束 run9 14036
# [] 空
# 进程已结束,退出代码 0

3.进程异步( apply_async)

# 异步  带问题的
import os
import time
import random
from multiprocessing import Pool
def work(n):print("开始 run%s" % n, os.getpid())time.sleep(1)print("结束 run%s" % n, os.getpid())
if __name__ == '__main__':print("主进程")p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务for i in range(10):res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行# 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务# 注意这里是一个真异步 就是主进程不会等子进程结束 而是主进程执行完了不会管子进程  因为没有发感知进程池结束
# 执行结果
#     主进程
#     进程已结束,退出代码 0
# 异步
import os
import time
import random
from multiprocessing import Pool
def work(n):print("开始 run%s" % n, os.getpid())time.sleep(1)print("结束 run%s" % n, os.getpid())
if __name__ == '__main__':p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务for i in range(10):res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行# 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务p.close()    # 结束进程池接收任务p.join()     # 感知进程池中的任务结束# 执行结果
# 开始 run0 12284
# 开始 run1 4864
# 开始 run2 18452
# 结束 run0 12284
# 开始 run3 12284
# 结束 run1 4864
# 开始 run4 4864
# 结束 run2 18452
# 开始 run5 18452
# 结束 run3 12284
# 开始 run6 12284
# 结束 run4 4864
# 开始 run7 4864
# 结束 run5 18452
# 开始 run8 18452
# 结束 run6 12284
# 开始 run9 12284
# 结束 run7 4864
# 结束 run8 18452
# 结束 run9 12284
# 进程已结束,退出代码 0


import os
import time
import random
from multiprocessing import Pooldef work(n):print("开始 run%s" % n, os.getpid())time.sleep(1)print("结束 run%s" % n, os.getpid())
if __name__ == '__main__':print("主进程")p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务res_l=[]for i in range(5):res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行# 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务# 需要注意的是,进程池中的三个进程不会同时开启或者同时结束# 而是执行完一个就释放一个进程,这个进程就去接收新的任务。
        res_l.append(res)# 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果# 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
    p.close()p.join()# print(res_l,"》》》》》》》》》》")for res in res_l:print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get# 执行结果
    主进程开始 run0 14684开始 run1 15180开始 run2 18996结束 run0 14684开始 run3 14684结束 run1 15180开始 run4 15180结束 run2 18996结束 run3 14684结束 run4 15180NoneNoneNoneNoneNone进程已结束,退出代码 0
from multiprocessing import Pool
import time,random,osdef work(n):time.sleep(0.5)return n**2
if __name__ == '__main__':p=Pool()res_l=[]for i in range(10):res=p.apply_async(work,args=(i,))res_l.append(res)p.close()p.join() #等待进程池中所有进程执行完毕for aa in  res_l:print(aa.get())    # 获取进程池中所有数据# 0
# 1
# 4
# 9
# 16
# 25
# 36
# 49
# 64
# 81

 

# 如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数。
from multiprocessing import Pool
import time,random,osdef work(n):time.sleep(1)return n**2
if __name__ == '__main__':p=Pool()res_l=[]for i in range(10):res=p.apply_async(work,args=(i,))res_l.append(res)p.close()p.join() #等待进程池中所有进程执行完毕
nums=[]for res in res_l:nums.append(res.get()) #拿到所有结果print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 

4.进程池版 socket

server

#
Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count()) #开启6个客户端,会发现2个客户端处于等待状态 #在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程 from socket import * from multiprocessing import Pool import osserver=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5)def talk(conn):print('进程pid: %s' %os.getpid())while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p=Pool(4)while True:conn,*_=server.accept()p.apply_async(talk,args=(conn,))# p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
client

from
socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))

 5. 进程池回调函数(主要用于爬虫)

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:
我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
# 先执行fun1函数 fun1函数的返回值  作为回调函数的参数  然后去执行回调函数
import os
import time
import random
from multiprocessing import Pool# 先执行fun1函数 fun1函数的返回值  作为回调函数的参数  然后去执行回调函数
def fun1(n):print("这是fun1函数",os.getpid())return  n*ndef fun2(aa):    #  参数只能回调哪个参数print("这是fun2函数",os.getpid())print(aa)if __name__ == '__main__':print("主进程",os.getpid())p=Pool(5)p.apply_async(fun1,args=(10,),callback=fun2)p.close()p.join()# 说明fun2回调函数 回到 主进程中调用

执行结果主进程 19268这是fun1函数 12740这是fun2函数 19268100进程已结束,退出代码 0
import os
import time
import random
from multiprocessing import Pool# 先执行fun1函数 fun1函数的返回值  作为回调函数的参数  然后去执行回调函数
def fun1(n):print("这是fun1函数",os.getpid())return  n*ndef fun2(aa):    #  参数只能回调哪个参数print("这是fun2函数",os.getpid())print(aa)if __name__ == '__main__':print("主进程",os.getpid())p=Pool(5)p.apply_async(fun1,args=(5,),callback=fun2)p.close()p.join()# 说明fun2回调函数 回到 主进程中调用
执行结果主进程 7164这是fun1函数 3272这是fun2函数 716425进程已结束,退出代码 0

 爬虫案例

import requests
from  multiprocessing import Poolaa=requests.get("https://www.baidu.com")
print(aa)
print(aa.status_code)
print(aa.text)
print("11111",aa.content)
print(aa.__dict__)   查看里面属性
print("**************************************")

执行结果

200

ç¾åº¦ä¸ä¸ï¼ä½ å°±ç¥é
ç»å½');
æ´å¤äº§å

å³äºç¾åº¦ About Baidu

©2017 Baidu ä½¿ç¨ç¾åº¦åå¿è¯»  æè§å馠京ICPè¯030173å· 



11111 b'\r\n b' http-equiv=X-UA-Compatible content=IE=Edge> b'href=https://ss1.bdstatic.com/5eN1bjq8AAUYm2zgoY3K/r/www/cache/bdorz/baidu.min.css>\xe7\x99\xbe\xe5\xba\xa6\<br /> xe4\xb8\x80\xe4\xb8\x8b\xef\xbc\x8c\xe4\xbd\xa0\xe5\xb0\xb1\xe7\x9f\xa5\xe9\x81\x93 ' \
b' b'hidefocus=true src=//www.baidu.com/img/bd_logo1.png width=270 height=129>
b'u.com/s class="fm"> b'n name=f value=8> b'ame=tn value=baidu> b'focus=autofocus> b'4\xb8\x8b class="bg s_btn" autofocus>
b'ews class="mnav">\xe6\x96\xb0\xe9\x97\xbb hao123 b'f=http://map.baidu.com name=tj_trmap class="mnav">\xe5\x9c\xb0\xe5\x9b\xbe
\xe9\xa6\x88
 \xe4\xba\xacICP\xe8\xaf\x81030173\xe5\x8f\xb7  ' \
b'

\r\n'

def
get_ali(url):# print(url)# res=requests.get(url)# print(res)# print(res.status_code)# print(res.text)res=requests.get(url)if res.status_code==200:return res.content.decode("utf-8"),url,res.textdef show(args):cont,url,text=argsprint(cont,text)print(url,len(cont))if __name__=="__main__":ret=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=Pool(5)for url in ret:p.apply_async(get_ali,args=(url,),callback=show)p.close()p.join()
 
使用多进程请求多个url来减少网络等待浪费的时间

from
multiprocessing import Pool import requests import json import osdef get_page(url):print('<进程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def pasrse_page(res):print('<进程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=Pool(3)res_l=[]for url in urls:res=p.apply_async(get_page,args=(url,),callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了''' 打印结果: <进程3388> get https://www.baidu.com <进程3389> get https://www.python.org <进程3390> get https://www.openstack.org <进程3388> get https://help.github.com/ <进程3387> parse https://www.baidu.com <进程3389> get http://www.sina.com.cn/ <进程3387> parse https://www.python.org <进程3387> parse https://help.github.com/ <进程3387> parse http://www.sina.com.cn/ <进程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '\r\n...',...}] '''
import re
from urllib.request import urlopen
from multiprocessing import Pooldef get_page(url,pattern):response=urlopen(url).read().decode('utf-8')return pattern,responsedef parse_page(info):pattern,page_content=infores=re.findall(pattern,page_content)for item in res:dic={'index':item[0].strip(),'title':item[1].strip(),'actor':item[2].strip(),'time':item[3].strip(),}print(dic)
if __name__ == '__main__':regex = r'
.*?<.*?class="board-index.*?>(\d+).*?title="(.*?)".*?class="movie-item-info".*?

(.*?)

.*?

(.*?)

'pattern1=re.compile(regex,re.S)url_dic={'http://maoyan.com/board/7':pattern1,}p=Pool()res_l=[]for url,pattern in url_dic.items():res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)res_l.append(res)for i in res_l:i.get(){'index': '1', 'title': '绝杀慕尼黑', 'actor': '主演:弗拉基米尔·马什科夫,约翰·萨维奇,伊万·科列斯尼科夫', 'time': '上映时间:2019-06-13'} {'index': '2', 'title': '千与千寻', 'actor': '主演:柊瑠美,周冬雨,入野自由', 'time': '上映时间:2019-06-21'} {'index': '3', 'title': '命运之夜——天之杯II :迷失之蝶', 'actor': '主演:杉山纪彰,下屋则子,神谷浩史', 'time': '上映时间:2019-07-12'} {'index': '4', 'title': '玩具总动员4', 'actor': '主演:汤姆·汉克斯,蒂姆·艾伦,安妮·波茨', 'time': '上映时间:2019-06-21'} {'index': '5', 'title': '扫毒2天地对决', 'actor': '主演:刘德华,古天乐,苗侨伟', 'time': '上映时间:2019-07-05'} {'index': '6', 'title': '蜘蛛侠:英雄远征', 'actor': '主演:汤姆·赫兰德,杰克·吉伦哈尔,塞缪尔·杰克逊', 'time': '上映时间:2019-06-28'} {'index': '7', 'title': '爱宠大机密2', 'actor': '主演:帕顿·奥斯瓦尔特,冯绍峰,凯文·哈特', 'time': '上映时间:2019-07-05'} {'index': '8', 'title': '狮子王', 'actor': '主演:唐纳德·格洛弗,塞斯·罗根,詹姆斯·厄尔·琼斯', 'time': '上映时间:2019-07-12'} {'index': '9', 'title': '机动战士高达NT', 'actor': '主演:榎木淳弥,村中知,松浦爱弓', 'time': '上映时间:2019-07-12'} {'index': '10', 'title': '最好的我们', 'actor': '主演:陈飞宇,何蓝逗,惠英红', 'time': '上映时间:2019-06-06'}

 6. 进程池返回值

 
异步获取返回值

def
fun (i):# time.sleep(1)return i*i if __name__ == '__main__':p=Pool(5)for i in range(6):res=p.apply_async(fun,args=(i,)) # # print(res)print(res.get()) #等待计算结果 阻塞获取结果 等待下次循环 这里相当于join# print(res.get())获取到值但是和同步一样 但是我们这里是异步 所以异步get阻塞

解决方案

import
os import time import random from multiprocessing import Pool # 异步获取返回值 表示五个五个返回数据 def fun (i):time.sleep(1)return i*i if __name__ == '__main__':p=Pool(5)ret=[]for i in range(22):res=p.apply_async(fun,args=(i,)) # ret.append(res) #等待计算结果 阻塞获取结果 等待下次循环 这里相当于joinfor s in ret:print( s.get()) # 异步获取返回值 表示五个五个返回数据
  
 


异步获取返回值 map

# map 自带join方法 close 方法    map 会先把每一个计算结果 添加到列表中   在返回 列表
def fun (i):time.sleep(1)return  i*i
if __name__ == '__main__':p=Pool(5)res=p.map(fun,range(5)) #
    print(res)
# [0, 1, 4, 9, 16]
   
 
同步获取返回值
def fun (i):return  i*i
if __name__ == '__main__':p=Pool(5)for i in  range(5):res=p.apply(fun,args=(i,))  #  apply结果就是fun的返回值print(res)# 0
# 1
# 4
# 9
# 16

 

转载于:https://www.cnblogs.com/Sup-to/p/11195123.html


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部