612

1.死锁,互斥锁,递归锁

互斥锁Lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性
注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对.
1) 死锁

noodle_lock = Lock()
kuaizi_lock = Lock()
def eat1(name):noodle_lock.acquire()print("%s 拿到面条啦!!" % (name))kuaizi_lock.acquire()print("%s 拿到筷子啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)kuaizi_lock.release()print("%s 放下筷子了啊~" % (name))noodle_lock.release()print("%s 放下面条了啊!!" % (name))def eat2(name):kuaizi_lock.acquire()print("%s 拿到筷子啦!!" % (name))noodle_lock.acquire()print("%s 拿到面条啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)noodle_lock.release()print("%s 放下面条了啊!!" % (name))kuaizi_lock.release()print("%s 放下筷子了啊~" % (name))if __name__ == "__main__":name_lst1 = ["啊1","啊2"]name_lst2 = ["啊3","啊4"]for name in name_lst1:Thread(target=eat1,args=(name,)).start()for name in name_lst2:Thread(target=eat2,args=(name,)).start()

2) 递归锁
    递归锁专门用来解决死锁现象,
    临时用于快速解决项目因死锁问题不能正常运行的场景
    用来处理异常死锁的

rlock = RLock() # 递归锁
lock = Lock() # 互斥锁
def func():"""死锁: 只上锁不解锁是死锁.lock.acquire()lock.acquire()print(333)lock.release()lock.release()"""rlock.acquire()rlock.acquire()print(111)rlock.release()rlock.release()print(222)
func()

3) 用递归锁解决死锁

noodle_lock = kuaizi_lock = RLock()
def eat1(name):noodle_lock.acquire()print("%s 拿到面条啦!!" % (name))kuaizi_lock.acquire()print("%s 拿到筷子啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)kuaizi_lock.release()print("%s 放下筷子了啊~" % (name))noodle_lock.release()print("%s 放下面条了啊!!" % (name))def eat2(name):kuaizi_lock.acquire()print("%s 拿到筷子啦!!" % (name))noodle_lock.acquire()print("%s 拿到面条啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)noodle_lock.release()print("%s 放下面条了啊!!" % (name))kuaizi_lock.release()print("%s 放下筷子了啊~" % (name))if __name__ == "__main__":name_lst1 = ["啊1","啊2"]name_lst2 = ["啊3","啊4"]for name in name_lst1:Thread(target=eat1,args=(name,)).start()for name in name_lst2:Thread(target=eat2,args=(name,)).start()

4) 互斥锁
尽量使用一把锁解决问题,不要互相嵌套,否则容易死锁

mylock = Lock()
def eat1(name):mylock.acquire()print("%s 拿到面条啦!!" % (name))print("%s 拿到筷子啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)	print("%s 放下筷子了啊~" % (name))	print("%s 放下面条了啊!!" % (name))mylock.release()def eat2(name):mylock.acquire()print("%s 拿到筷子啦!!" % (name))print("%s 拿到面条啦!!" % (name))print("开始享受这碗面条.. ")time.sleep(0.5)	print("%s 放下面条了啊!!" % (name))print("%s 放下筷子了啊~" % (name))mylock.release()if __name__ == "__main__":name_lst1 = ["啊1","啊2"]name_lst2 = ["啊3","啊4"]for name in name_lst1:Thread(target=eat1,args=(name,)).start()for name in name_lst2:Thread(target=eat2,args=(name,)).start()

2.事件 Event

e = Event()# wait   动态加阻塞# clear  将阻塞事件的值改成False# set    将阻塞事件的值改成True# is_set 获取阻塞事件的值

1) 基本语法

e = Event()
print(e.is_set())
e.set()
e.clear()
e.wait(3)
print("程序在运行 ... ")

2) 模拟连接远程数据库
连接三次数据库,如果都不成功,直接抛出异常

def check(e):# 模拟网络延迟time.sleep(random.randrange(1,6))# 开始检测链接合法性print("开始检测链接合法性")e.set()def connect(e):sign = Falsefor i in range(1,4):# 设置最大等待时间 1e.wait(1)if e.is_set():print("数据库链接成功 ... ")sign = Truebreakelse:print("尝试链接数据%s次失败" % (i) )if sign == False:# 主动抛出异常raise TimeoutError	e = Event()# 线程1负责执行检测任务
Thread(target=check,args=(e,)).start()
# 线程2负责执行连接任务
Thread(target=connect,args=(e,)).start()

3.线程队列

put 存
get 取
put_nowait 存,超出了队列长度,报错
get_nowait 取,没数据取不出来,报错linux windows 线程中put_nowait,get_nowait都支持

1) Queue
先进先出,后进后出

q = Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
# 取不出来,阻塞
# print(q.get())
print(q.get_nowait())
q2 = Queue(3)
q2.put(11)
q2.put(22)
q2.put(33)
# 放不进去了,阻塞
# q2.put(44)
q2.put_nowait(44)

2)LifoQueue 先进后出,后进先出(按照栈的特点设计)

from queue import LifoQueue
lq = LifoQueue(3)
lq.put(11)
lq.put(22)
lq.put(33)
# print(lq.put_nowait(444))print(lq.get())
print(lq.get())
print(lq.get())

3)PriorityQueue 按照优先级顺序排序 (默认从小到大排序)

from queue import PriorityQueue# 如果都是数字,默认从小到大排序
pq = PriorityQueue()
pq.put(13)
pq.put(3)
pq.put(20)
print(pq.get())
print(pq.get())
print(pq.get())# 如果都是字符串
"""如果是字符串,按照ascii编码排序"""
pq1 = PriorityQueue()
pq1.put("chinese")
pq1.put("america")
pq1.put("latinos")
pq1.put("blackman")print(pq1.get())
print(pq1.get())
print(pq1.get())
print(pq1.get())# 要么全是数字,要么全是字符串,不能混合 error
"""
pq2 = PriorityQueue()
pq2.put(13)
pq2.put("aaa")
pq2.put("拟稿")
"""pq3 = PriorityQueue()
# 默认按照元组中的第一个元素排序
pq3.put( (20,"wangwen") )
pq3.put( (18,"wangzhen") )
pq3.put( (30,"weiyilin") )
pq3.put( (40,"xiechen") )print(pq3.get())
print(pq3.get())
print(pq3.get())
print(pq3.get())

4.进程池和线程池

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
def func(i):print("任务执行中... start" , os.getpid())time.sleep(10)print("任务结束... end" , i)return i

1) ProcessPoolExecutor 进程池基本使用

"""默认如果一个进程短时间内可以完成更多的任务,就不会创建额外的新的进程,以节省资源"""
if __name__ == "__main__":lst = []# print(os.cpu_count()) # 8 cpu逻辑核心数# (1) 创建进程池对象"""进程池里面最多创建os.cpu_count()这么多个进程,所有任务全由这几个进程完成,不会额外创建进程"""p = ProcessPoolExecutor()# (2) 异步提交任务for i in range(10):res = p.submit(func,i)lst.append(res)		# (3) 获取当前进程池返回值# for i in lst:# print(i.result())# (4) 等待所有子进程执行结束p.shutdown()  # join	print("主程序执行结束....")

2) ThreadPoolExecutor 线程池的基本用法

"""默认如果一个线程短时间内可以完成更多的任务,就不会创建额外的新的线程,以节省资源"""
from threading import current_thread as cthread
def func(i):print("thread ... start" , cthread().ident,i)time.sleep(3)print("thread ... end" , i )	return cthread().identif __name__ == "__main__":lst = []setvar = set()# (1) 创建线程池对象"""限制线程池最多创建os.cpu_count() * 5 = 线程数,所有任务全由这几个线程完成,不会额外创建线程"""tp = ThreadPoolExecutor()# 我的电脑40个线程并发# (2) 异步提交任务for i in range(100):res = tp.submit(func,i)lst.append(res)# (3) 获取返回值for i in lst:setvar.add(i.result())# (4) 等待所有子线程执行结束tp.shutdown()print(len(setvar) , setvar)print("主线程执行结束 ... ")

3) 线程池 map

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread as cthread
from collections import Iterator
def func(i):# print("thread start ... ",cthread().ident)# print("thread end ... ",i)time.sleep(0.5)return "*" * i
if __name__ == "__main__":setvar = set()lst = []tp = ThreadPoolExecutor(5)# map(自定义函数,可迭代性数据) 可迭代性数据(容器类型数据,迭代器,range对象)it = tp.map(func,range(20))# 判定返回值是否是迭代器print(isinstance(it,Iterator))tp.shutdown()	for i in it:	print(i)

5.回调函数
回调函数:
    把函数当成参数传递给另外一个函数
    在当前函数执行完毕之后,最后调用一下该参数(函数),这个函数就是回调函数
功能:
    打印状态:   a属性
    支付状态:   b属性
    退款状态:   c属性
    转账的状态: d属性
    把想要的相关成员或者相关逻辑写在自定义的函数中
    支付宝接口在正常执行之后,会调用自定义的函数,来执行相应的逻辑
    那么这个函数就是回调函数

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread as cthread
import os,timedef func1(i):print("Process start ... ",os.getpid())time.sleep(0.5)print("Process end ... ",i)return "*" * idef func2(i):print("thread start ... ",cthread().ident)time.sleep(0.5)print("thread end ... ",i)return "*" * idef call_back1(obj):print("<==回调函数callback进程号:===>",os.getpid())print(obj.result())def call_back2(obj):print("<==回调函数callback线程号:===>",cthread().ident)print(obj.result())

1) 进程池的回调函数: 由主进程执行调用完成

if __name__ == "__main__":p = ProcessPoolExecutor(5)for i in range(1,11):res = p.submit(func1,i)# 进程对象.add_done_callback(回调函数) '''add_done_callback 可以把res本对象和回调函数自动传递到函数里来'''res.add_done_callback(call_back1)p.shutdown()print("主进程执行结束 ... " , os.getpid())

2) 线程池的回调函数: 由当前子线程执行调用完成

if __name__ == "__main__":tp = ThreadPoolExecutor(5)for i in range(1,11):res = tp.submit(func2,i)# 进程对象.add_done_callback(回调函数) '''add_done_callback 可以把res本对象和回调函数自动传递到函数里来'''res.add_done_callback(call_back2)tp.shutdown()print("主线程执行结束 ... " , cthread().ident)

1)add_done_callback 原型

class Ceshi():def add_done_callback(self,func):print("执行操作1 ... ")print("执行操作2 ... ")func(self)def result(self):return 123456def call_back3(obj):print(obj)print(obj.result())obj = Ceshi()
obj.add_done_callback(call_back3)

6.协程

1) 用协程改成一下生产者消费者模型

def producer():# 数据范围0 ~ 999for i in range(1000):yield idef counsumer(gen):for i in range(10):print(next(gen))# 初始化生成器函数
gen = producer()	
counsumer(gen)
counsumer(gen)
counsumer(gen)

2) 协程的具体实现

switch 遇到阻塞时,只能手动调用该函数进行函数切换,不能自动实现切换,来规避io阻塞

def eat():print("eat 1")g2.switch()time.sleep(3)print("eat 2")def play():print("play one")	time.sleep(3)print("play two")g1.switch()g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch()

3) gevent 

import gevent
def eat():print("eat 1")time.sleep(3)print("eat 2")def play():print("play one")	time.sleep(3)print("play two")# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)# 阻塞,必须g1协程执行完毕为止
g1.join()	
# 阻塞,必须gg协程执行完毕为止
g2.join()print("主线程执行完毕 ... ")

4) gevent.time 添加阻塞,让他实现自动切换

def eat():print("eat 1")gevent.sleep(3)print("eat 2")def play():print("play one")	gevent.sleep(3)print("play two")# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)# 阻塞,必须g1协程执行完毕为止
g1.join()	
# 阻塞,必须gg协程执行完毕为止
g2.join()
print("主线程执行完毕 ... ")

5) 终极大招 彻底解决不识别阻塞的问题

from gevent import monkey
monkey.patch_all() # 把下面所有引入的模块中的阻塞识别一下
import time
import gevent def eat():print("eat 1")time.sleep(3)print("eat 2")def play():print("play one")	time.sleep(3)print("play two")# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)# 阻塞,必须g1协程执行完毕为止
g1.join()	
# 阻塞,必须gg协程执行完毕为止
g2.join()
print("主线程执行完毕 ... ")

7.协程的例子

# (1) spawn(函数,参数1,参数2,参数3 .... )  启动协程
# (2) join 阻塞,直到某个协程任务执行完毕之后,再放行
# (3) joinall 等待所有协程任务都执行完毕之后,在放行
      g1.join()  g2.join()   <=>  gevent.joinall( [g1,g2] )(推荐:比较简洁)
# (4) value 获取协程任务中的返回值 g1.value g2.value 获取对应协程中的返回值  

# 语句和语句之间可以用分好;隔开写在一行;

1) 协程的其他方法
# 把下面所有引入的模块中的阻塞识别一下

from gevent import monkey ; monkey.patch_all() import time
import gevent def eat():print("eat 1")time.sleep(3)print("eat 2")return "吃完了"def play():print("play one")	time.sleep(3)print("play two")return "玩完了"# 利用gevent.spawn创建协程对象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn创建协程对象g2
g2 = gevent.spawn(play)# 默认:主线程不会等待所有的协程都执行完毕就会终止程序
# 等待g1 g2协程任务完毕之后再向下执行;
gevent.joinall( [g1,g2] ) # 一个列表参数
print("主线程执行结束 ... ")print(g1.value)
print(g2.value)

2) 利用协程爬取数据
requests 抓取页面数据模块

HTTP 状态码200 ok404 not found400 bad request

基本语法

from gevent import monkey ; monkey.patch_all()
import time
import gevent
import requests
"""
response = requests.get("http://www.baidu.com")
print(response)
# 获取状态码
print( response.status_code )
# 获取网页中的字符编码
res = response.apparent_encoding
print( res )
# 设置编码集,防止乱码
response.encoding = res
# 获取网页当中的内容
res = response.text
print(res)

2) 用协程的方式爬取数据

lst = []
starttime = time.time()
for i in url_list:g = gevent.spawn(get_url,i)lst.append(g)gevent.joinall(lst)	endtime = time.time()print("执行时间:" ,endtime - starttime ) # 执行时间: 2.3307271003723145"""
利用好多进程,多线程,多协程可以让服务期运行速度更快,
并且也可以抗住更多用户的访问;

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部