线程补充、线程/进程池、协程

6、死锁,递归锁,互斥锁

死锁:加了一把锁,但是没有与解锁对应或出现了间隔又通过一把锁间隔了而没有对应的解锁,产生程序阻塞无法运行,所以产生了死锁

互斥锁:加一把锁就对应解一把锁,就是互斥锁

递归锁:递归锁专门用于解决死锁,但只是一种应急处理方法,临时用于快速解决项目因死锁问题不能正常运行的场景,用于处理异常死锁,和互斥锁一样都可以使用 with 的形式加锁。

注:不到万不得已的特殊情况不能使用锁的循环嵌套

# 死锁
noodle_lock = Lock()
kuaizi_lock = Lock()
def eat1(name):noodle_lock.acquire()print("%s 拿到面条啦!!" % (name))kuaizi_lock.acquire()print("%s 拿到筷子啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)kuaizi_lock.release()print("%s 放下筷子了啊~" % (name))noodle_lock.release()print("%s 放下面条了啊!!" % (name))def eat2(name):kuaizi_lock.acquire()print("%s 拿到筷子啦!!" % (name))noodle_lock.acquire()print("%s 拿到面条啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)noodle_lock.release()print("%s 放下面条了啊!!" % (name))kuaizi_lock.release()print("%s 放下筷子了啊~" % (name))if __name__ == "__main__":name_lst1 = ["王振","王赢钱"]name_lst2 = ["李天照","魏小林"]for name in name_lst1:Thread(target=eat1,args=(name,)).start()for name in name_lst2:Thread(target=eat2,args=(name,)).start()# 互斥锁
mylock = Lock()
def eat1(name):mylock.acquire()print("%s 拿到面条啦!!" % (name))print("%s 拿到筷子啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)	print("%s 放下筷子了啊~" % (name))	print("%s 放下面条了啊!!" % (name))mylock.release()def eat2(name):mylock.acquire()print("%s 拿到筷子啦!!" % (name))print("%s 拿到面条啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)	print("%s 放下面条了啊!!" % (name))print("%s 放下筷子了啊~" % (name))mylock.release()if __name__ == "__main__":name_lst1 = ["王振","王赢钱"]name_lst2 = ["李天照","魏小林"]for name in name_lst1:Thread(target=eat1,args=(name,)).start()for name in name_lst2:Thread(target=eat2,args=(name,)).start()# 递归锁
rlock = RLock() # 递归锁
lock = Lock() # 互斥锁
def func():"""死锁: 只上锁不解锁是死锁.lock.acquire()lock.acquire()print(333)lock.release()lock.release()"""rlock.acquire()rlock.acquire()print(111)rlock.release()rlock.release()print(222)
func()# 使用递归锁解决死锁
noodle_lock = kuaizi_lock = RLock()
def eat1(name):noodle_lock.acquire()print("%s 拿到面条啦!!" % (name))kuaizi_lock.acquire()print("%s 拿到筷子啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)kuaizi_lock.release()print("%s 放下筷子了啊~" % (name))noodle_lock.release()print("%s 放下面条了啊!!" % (name))def eat2(name):kuaizi_lock.acquire()print("%s 拿到筷子啦!!" % (name))noodle_lock.acquire()print("%s 拿到面条啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)noodle_lock.release()print("%s 放下面条了啊!!" % (name))kuaizi_lock.release()print("%s 放下筷子了啊~" % (name))if __name__ == "__main__":name_lst1 = ["王振","王赢钱"]name_lst2 = ["李天照","魏小林"]for name in name_lst1:Thread(target=eat1,args=(name,)).start()for name in name_lst2:Thread(target=eat2,args=(name,)).start()

7、Event事件

​ wait:动态加阻塞

​ clear:将阻塞事件的值改成False

​ set:将阻塞事件的值改成True

​ is_set:获取阻塞事件的值

# 模拟连接远程数据库
"""连接三次数据库,如果都不成功,直接抛出异常"""
def check(e):# 模拟网络延迟time.sleep(random.randrange(1,6))# 开始检测链接合法性print("开始检测链接合法性")e.set()def connect(e):sign = Falsefor i in range(1,4):# 设置最大等待时间 1e.wait(1)if e.is_set():print("数据库链接成功 ... ")sign = Truebreakelse:print("尝试链接数据%s次失败" % (i) )if sign == False:# 主动抛出异常raise TimeoutError	e = Event()# 线程1负责执行检测任务
Thread(target=check,args=(e,)).start()
# 线程2负责执行连接任务
Thread(target=connect,args=(e,)).start()

8、线程队列

常用的线程队列有 queue、LifoQueue、PriorityQueue

queue:先进先出

LifoQueue:后进后出(按照栈的特点设计的)

PriorityQueue:按照优先级排序(从小到大);要么全是数字,要么全是字符串,不能混合,混合就报错:error

put:向队列中存,村的超出长度就阻塞

get:在队列中取,取得超出长度就阻塞

put_nowait:存,超出了队列长度,报错

get_nowait:取,没数据取不出来,报错

# (1) Queue
q = Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
# 取不出来,阻塞
# print(q.get())
print(q.get_nowait())q2 = Queue(3)
q2.put(11)
q2.put(22)
q2.put(33)
# 放不进去了,阻塞
# q2.put(44)
q2.put_nowait(44)# (2)LifoQueue
from queue import LifoQueue
lq = LifoQueue(3)
lq.put(11)
lq.put(22)
lq.put(33)
# print(lq.put_nowait(444))print(lq.get())
print(lq.get())
print(lq.get())# (3)PriorityQueue 
from queue import PriorityQueue# 数字,默认从小到大排序
pq = PriorityQueue()
pq.put(13)
pq.put(3)
pq.put(20)
print(pq.get())
print(pq.get())
print(pq.get())# 字符串按照ascii编码排序
pq1 = PriorityQueue()
pq1.put("chinese")
pq1.put("america")
pq1.put("latinos")
pq1.put("blackman")print(pq1.get())
print(pq1.get())
print(pq1.get())
print(pq1.get())pq3 = PriorityQueue()
# 元组中的第一个元素排序
pq3.put( (20,"wangwen") )
pq3.put( (18,"wangzhen") )
pq3.put( (30,"weiyilin") )
pq3.put( (40,"xiechen") )print(pq3.get())
print(pq3.get())
print(pq3.get())
print(pq3.get())

9、线程池和进程池(改良版)

注: 线程池是由子线程实现的;进程池是由主进程实现的

线程池/进程池:

​ 实例化线程池:ThreadPoolExcutor (推荐5*cpu_count)

​ 异步提交任务:submit / map

​ 阻塞直到任务完成:shutdown

​ 获取子线程的返回值:result

​ 使用回调函数:add_done_callback

默认如果一个进程短时间内可以完成更多的任务,就不会创建额外的新的进程,以节省资源

回调函数:

​ 就是一个参数,将这个函数作为参数传到另一个函数里面.
​ 函数先执行,再执行当参数传递的这个函数,这个参数函数是回调函数

功能:
打印状态: a属性
支付状态: b属性
退款状态: c属性
转账的状态: d属性
把想要的相关成员或者相关逻辑写在自定义的函数中
支付宝接口在正常执行之后,会调用自定义的函数,来执行相应的逻辑
那么这个函数就是回调函数

''' 进程池和线程池 '''
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
def func(i):print("任务执行中... start" , os.getpid())time.sleep(10)print("任务结束... end" , i)return i# (1) ProcessPoolExecutor 进程池基本使用
'''
"""默认如果一个进程短时间内可以完成更多的任务,就不会创建额外的新的进程,以节省资源"""
if __name__ == "__main__":lst = []# print(os.cpu_count()) # 8 cpu逻辑核心数# (1) 创建进程池对象"""进程池里面最多创建os.cpu_count()这么多个进程,所有任务全由这几个进程完成,不会额外创建进程"""p = ProcessPoolExecutor()# (2) 异步提交任务for i in range(10):res = p.submit(func,i)lst.append(res)		# (3) 获取当前进程池返回值# for i in lst:# print(i.result())# (4) 等待所有子进程执行结束p.shutdown()  # join	print("主程序执行结束....")
'''# (2) ThreadPoolExecutor 线程池的基本用法
'''
"""默认如果一个线程短时间内可以完成更多的任务,就不会创建额外的新的线程,以节省资源"""
from threading import current_thread as cthread
def func(i):print("thread ... start" , cthread().ident,i)time.sleep(3)print("thread ... end" , i )	return cthread().identif __name__ == "__main__":lst = []setvar = set()# (1) 创建线程池对象"""限制线程池最多创建os.cpu_count() * 5 = 线程数,所有任务全由这几个线程完成,不会额外创建线程"""tp = ThreadPoolExecutor()# 我的电脑40个线程并发# (2) 异步提交任务for i in range(100):res = tp.submit(func,i)lst.append(res)# (3) 获取返回值for i in lst:setvar.add(i.result())# (4) 等待所有子线程执行结束tp.shutdown()print(len(setvar) , setvar)print("主线程执行结束 ... ")
'''# (3) 线程池 map
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread as cthread
from collections import Iterator
def func(i):# print("thread start ... ",cthread().ident)# print("thread end ... ",i)time.sleep(0.5)return "*" * i
if __name__ == "__main__":setvar = set()lst = []tp = ThreadPoolExecutor(5)# map(自定义函数,可迭代性数据) 可迭代性数据(容器类型数据,迭代器,range对象)it = tp.map(func,range(20))# 判定返回值是否是迭代器print(isinstance(it,Iterator))tp.shutdown()	for i in it:	print(i)# add_done_callback 原型
class Ceshi():def add_done_callback(self,func):print("执行操作1 ... ")print("执行操作2 ... ")func(self)def result(self):return 123456def call_back3(obj):print(obj)print(obj.result())obj = Ceshi()
obj.add_done_callback(call_back3)from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread as cthread
import os,timedef func1(i):print("Process start ... ",os.getpid())time.sleep(0.5)print("Process end ... ",i)return "*" * idef func2(i):print("thread start ... ",cthread().ident)time.sleep(0.5)print("thread end ... ",i)return "*" * idef call_back1(obj):print("<==回调函数callback进程号:===>",os.getpid())print(obj.result())def call_back2(obj):print("<==回调函数callback线程号:===>",cthread().ident)print(obj.result())# (1) 进程池的回调函数: 由主进程执行调用完成
"""
if __name__ == "__main__":p = ProcessPoolExecutor(5)for i in range(1,11):res = p.submit(func1,i)# 进程对象.add_done_callback(回调函数) '''add_done_callback 可以把res本对象和回调函数自动传递到函数里来'''res.add_done_callback(call_back1)p.shutdown()print("主进程执行结束 ... " , os.getpid())
"""# (2) 线程池的回调函数: 由当前子线程执行调用完成
if __name__ == "__main__":tp = ThreadPoolExecutor(5)for i in range(1,11):res = tp.submit(func2,i)# 进程对象.add_done_callback(回调函数) '''add_done_callback 可以把res本对象和回调函数自动传递到函数里来'''res.add_done_callback(call_back2)tp.shutdown()print("主线程执行结束 ... " , cthread().ident)

廿四、协程

1、协程也叫纤程:

​ 协程是线程的一种实现

​ 指的是一条线程能够在多任务之间来回切换的一种实现.
​ 对于CPU、操作系统来说,协程并不存在.
​ 任务之间的切换会花费时间.
​ 目前电脑配置一般线程开到200会阻塞卡顿.

2、协程的实现

​ 协程帮助你记住哪个任务执行到哪个位置上了,并且实现安全的切换
​ 一个任务一旦阻塞卡顿,立刻切换到另一个任务继续执行,保证线程总是忙碌的,更加充分的利用CPU,抢占更多的时间片

​ 一个线程可以由多个协程来实现,协程之间不会产生数据安全问题

3、协程模块

greenlet gevent的底层,协程,切换的模块

gevent 直接用的,gevent能提供更全面的功能

# (1) 用协程改成一下生产者消费者模型
"""
def producer():# 数据范围0 ~ 999for i in range(1000):yield idef counsumer(gen):for i in range(10):print(next(gen))# 初始化生成器函数
gen = producer()	
counsumer(gen)
counsumer(gen)
counsumer(gen)
"""# (2) 协程的具体实现
"""
switch 遇到阻塞时,只能手动调用该函数进行函数切换,不能自动实现切换,来规避io阻塞;
"""
from greenlet import greenlet
import time"""
def eat():print("eat 1")g2.switch()time.sleep(3)print("eat 2")def play():print("play one")	time.sleep(3)print("play two")g1.switch()g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch()
"""# (3) gevent 
"""gevent 可以自动切换,但是不能够自动识别time.sleep这样的阻塞"""
import gevent
"""
def eat():print("eat 1")time.sleep(3)print("eat 2")def play():print("play one")	time.sleep(3)print("play two")# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)# 阻塞,必须g1协程执行完毕为止
g1.join()	
# 阻塞,必须gg协程执行完毕为止
g2.join()print("主线程执行完毕 ... ")
"""
# (4) gevent.time 添加阻塞,让他实现自动切换
"""
def eat():print("eat 1")gevent.sleep(3)print("eat 2")def play():print("play one")	gevent.sleep(3)print("play two")# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)# 阻塞,必须g1协程执行完毕为止
g1.join()	
# 阻塞,必须gg协程执行完毕为止
g2.join()
print("主线程执行完毕 ... ")
"""# (5) 终极大招 彻底解决不识别阻塞的问题
from gevent import monkey
monkey.patch_all() # 把下面所有引入的模块中的阻塞识别一下
import time
import gevent def eat():print("eat 1")time.sleep(3)print("eat 2")def play():print("play one")	time.sleep(3)print("play two")# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)# 阻塞,必须g1协程执行完毕为止
g1.join()	
# 阻塞,必须gg协程执行完毕为止
g2.join()
print("主线程执行完毕 ... ")

4、协程的例子

(1) spawn(函数,参数1,参数2,数3 … ) 启动协程

(2) join 阻塞,直到某个协程任务执行完毕之后,再放行

(3) joinall 等待所有协程任务都执行完毕之后,在放行

g1.join()  g2.join()   <=>  gevent.joinall( [g1,g2] )(推荐:比较简洁)

(4) value 获取协程任务中的返回值 g1.value g2.value 获取对应协程中的返回值

(5) 注:python中语句和语句之间可以用分好;隔开写在一行;

# (1) 协程的其他方法
# 把下面所有引入的模块中的阻塞识别一下
"""
from gevent import monkey ; monkey.patch_all() import time
import gevent def eat():print("eat 1")time.sleep(3)print("eat 2")return "吃完了"def play():print("play one")	time.sleep(3)print("play two")return "玩完了"# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)# 默认:主线程不会等待所有的协程都执行完毕就会终止程序
# 等待g1 g2协程任务完毕之后再向下执行;
gevent.joinall( [g1,g2] ) # 一个列表参数
print("主线程执行结束 ... ")print(g1.value)
print(g2.value)
"""# (2) 利用协程爬取数据
"""requests 抓取页面数据模块""""""
HTTP 状态码200 ok404 not found400 bad request
"""# 基本语法
from gevent import monkey ; monkey.patch_all()
import time
import gevent
import requests
"""
response = requests.get("http://www.baidu.com")
print(response)
# 获取状态码
print( response.status_code )
# 获取网页中的字符编码
res = response.apparent_encoding
print( res )
# 设置编码集,防止乱码
response.encoding = res
# 获取网页当中的内容
res = response.text
print(res)
"""url_list = [
"http://www.baidu.com",
"http://www.4399.com/",
"http://www.7k7k.com/",
"http://www.taobao.com/",
"http://www.jingdong.com/"]
def get_url(url):response = requests.get(url)if response.status_code == 200:# print(response.text)time.sleep(0.1)
# (1) 正常爬取
'''
starttime = time.time()
for i in url_list:get_url(i)
endtime = time.time()
print("执行时间:" ,endtime - starttime ) # 18.780214071273804
'''
"""
import re
strvar = '# (2) 用协程的方式爬取数据lst = []
starttime = time.time()
for i in url_list:g = gevent.spawn(get_url,i)lst.append(g)gevent.joinall(lst)	endtime = time.time()print("执行时间:" ,endtime - starttime )


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部