小黑准备收拾东西领物资的日常积累:线程与进程

课程来源:https://www.bilibili.com/video/BV1ma411q7me/?spm_id_from=333.337.top_right_bar_window_custom_collection.content.click&vd_source=0b1383084f626b5cf37be3e82e883106

多线程实例

'''
基于多线程对上述串行示例进行优化:一个工厂,创建一个车间,这个车间中创建 3个工人,并行处理任务。
一个程序,创建一个进程,这个进程中创建 3个线程,并行处理任务。
'''
import time
import requests
import threadingurl_list = [("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"),("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34q1g"),("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg")
]def task(file_name,video_url):res = requests.get(video_url)with open(file_name,mode='wb') as f:f.write(res.content)print(time.time())for name,url in url_list:# 创建线程,让每个线程都执行task函数t = threading.Thread(target=task,args=(name,url))t.start()

1667533467.525886
1667533467.565978
1667533467.565978

多进程实例

import time
import requests
import threading
import multiprocessingurl_list = [("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"),("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34q1g"),("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg")
]def task(file_name,video_url):res = requests.get(video_url)with open(file_name,mode='wb') as f:f.write(res.content)print(time.time())
if __name__ == '__main__':for name,url in url_list:# 创建进程,让每个进程都执行task函数t = multiprocessing.Process(target=task,args=(name,url))t.start()

1667533771.786621
1667533771.786621
1667533771.786621

得出结论:在这个任务中多进程比多线程开销大

串行

import time
if __name__ == '__main__':start = time.time()result = 0for i in range(100000000):result += iprint(result)end = time.time()print('耗时:',end-start)

4999999950000000
耗时: 6.52695894241333

多进程处理

import time
import multiprocessingdef task(start,end,queue):result = 0for i in range(start,end):result += iqueue.put(result)if __name__ == '__main__':queue = multiprocessing.Queue()start_time = time.time()p1 = multiprocessing.Process(target = task,args = (0,50000000,queue))p1.start()p2 = multiprocessing.Process(target=task,args=(50000000,100000000,queue))p2.start()v1 = queue.get(block = True)v2 = queue.get(block = True)print(v1 + v2)end_time = time.time()print('耗时:',end_time - start_time)

4999999950000000
耗时: 2.2204787731170654

Process finished with exit code 0

多进程与多线程同时使用

import multiprocessing
import threadingdef thread_task():passdef task(start,end):t1 = threading.Thread(target = thread_task)t1.start()t2 = threading.Thread(target = thread_task)t2.start()t3 = threading.Thread(target = thread_task)t3.start()if __name__ == '__main__':p1 = multiprocessing.Process(target = task,args=(0,50000000))p1.start()p2 = multiprocessing.Process(target = task,args = (50000000,100000000))p2.start()

多线程开发

import threadingnumber = 0def _add():global numberfor i in range(10000000):number += 1t = threading.Thread(target = _add)
t.start()
t.join()    # 主进程等待中...
print(number)

10000000

import threadingnumber = 0def _add():global numberfor i in range(10000000):number += 1def _sub():global numberfor i in range(10000000):number -= 1t1 = threading.Thread(target = _add)
t2 = threading.Thread(target = _sub)
t1.start()
t1.join()  # t1线程执行完毕,才能继续往后走
t2.start()
t2.join()    # t2线程执行完毕,才能继续往后走
print(number)

0

import threadingnumber = 0def _add():global numberfor i in range(10000000):number += 1def _sub():global numberfor i in range(10000000):number -= 1t1 = threading.Thread(target = _add)
t2 = threading.Thread(target = _sub)
t1.start()
t2.start()
t1.join()  # t1线程执行完毕,才能继续往后走
t2.join()    # t2线程执行完毕,才能继续往后走
print(number)

-756842

守护进程

import threading
import timedef task(arg):time.sleep(5)print('任务')if __name__ == '__main__':t = threading.Thread(target=task,args=(11,))t.setDaemon(True)    # 设置为守护2进程t.start()print('END')

END

线程名称设置

import threadingdef task(arg):# 获取当前执行此代码的线程name = threading.current_thread().getName()print(name)for i in range(10):t = threading.Thread(target = task,args=(11,))t.setName('小黑无敌酒量')t.start()

小黑无敌酒量
小黑无敌酒量
小黑无敌酒量
小黑无敌酒量
小黑无敌酒量
小黑无敌酒量
小黑无敌酒量
小黑无敌酒量
小黑无敌酒量
小黑无敌酒量

自定义线程类,写在run里

import threading
class MyThread(threading.Thread):def run(self):print('执行此线程',self._args)
if __name__ == '__main__':t = MyThread(args=(100,))t.start()

执行此线程 (100,)

import requests
import threadingclass DouYinThread(threading.Thread):def run(self):file_name,video_url = self._argsres = requests.get(video_url)with open(file_name,mode='wb') as f:f.write(res.content)url_list = [("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"),("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34q1g"),("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg")
]for item in url_list:t = DouYinThread(args=(item[0],item[1]))t.start()

线程安全

import threadinglock_object = threading.RLock()loop = 10000000
number = 0def _add(count):lock_object.acquire()    # 申请锁global numberfor i in range(count):number += 1lock_object.release()    # 释放锁def _sub(count):lock_object.acquire()global numberfor i in range(count):number -= 1lock_object.release()t1 = threading.Thread(target=_add,args=(loop,))
t2 = threading.Thread(target=_sub,args=(loop,))
t1.start()
t2.start()t1.join()
t2.join()
print(number)

0

import threadingnum = 0def task():global numfor i in range(1000000):num += 1print(num)if __name__ == '__main__':for i in range(2):t = threading.Thread(target=task)t.start()

1552302
1971094

import threadingnum = 0
lock_object = threading.RLock()def task():print('开始')lock_object.acquire()    # 第一个抵达的线程进入并上锁,其他线程就需要在此等待。global numfor i in range(1000000):num += 1lock_object.release()    # 线程出去,并解开锁print(num)for i in range(2):t = threading.Thread(target=task)t.start()

开始
开始
1000000
2000000

Lock同步锁

import threadingnum = 0
lock_object = threading.Lock()def task():print('开始')global numlock_object.acquire()for i in range(1000000):num += 1lock_object.release()print(num)for i in range(2):t = threading.Thread(target = task)t.start()

开始
开始
1000000
2000000

RLock递归锁

import threadingnum = 0
lock_object = threading.RLock()def task():print('开始')global numlock_object.acquire()for i in range(1000000):num += 1lock_object.release()print(num)for i in range(2):t = threading.Thread(target = task)t.start()

开始
开始
1000000
2000000

递归锁多次申请锁和多次释放

import threading
import timelock_object = threading.RLock()def task():print('开始')lock_object.acquire()lock_object.acquire()print(123)lock_object.release()lock_object.release()for i in range(3):t = threading.Thread(target=task)t.start()

开始
123
开始开始
123

123

递归锁实例

import threading
lock = threading.RLock()# 程序员A开发了一个函数,函数可以被其他开发者调用,内部需要基于锁保证数据安全
def func():with lock:pass# 程序员B想用那个程序
def run():print('其他功能')func()print('其他功能')# 程序员C也想用,但是得加锁
def process():with lock:print('其他功能')func()    # 此时会出现多次锁的情况,只有Rlock支持print('其他功能')

死锁案例

import threadingnum = 0
lock_object = threading.Lock()def task():print('开始')lock_object.acquire()lock_object.acquire()global numfor i in range(100000):num += 1lock_object.release()lock_object.release()print(2)for i in range(2):t = threading.Thread(target = task)t.start()
import threading
import timelock_1 = threading.Lock()
lock_2 = threading.Lock()def task1():lock_1.acquire()time.sleep(1)lock_2.acquire()print(11)lock_2.release()print(111)lock_1.release()print(11111)def task2():lock_2.acquire()time.sleep(1)lock_1.acquire()print(22)lock_1.release()print(3333)lock_2.releasr()print(2222222)if __name__ == '__main__':t1 = threading.Thread(target=task1)t1.start()t2 = threading.Thread(target=task2)t2.start()

线程池

import time
from concurrent.futures import ThreadPoolExecutordef task(video_url):print('开始执行任务',video_url)time.sleep(5)# 创建线程池,最多维护10个进程
pool = ThreadPoolExecutor(10)url_list = ['www.{}'.format(i) for i in range(300)]for url in url_list:pool.submit(task,url)
print('执行中...')
pool.shutdown(True)
print('继续走下去')
import time
import random
from concurrent.futures import ThreadPoolExecutor,Futuredef task(video_url):print('开始执行任务',video_url)time.sleep(2)return random.randint(0,10)def done(response):print('任务执行后的返回值',response.result())if __name__ == '__main__':# 创建线程池,最多维护10个线程pool = ThreadPoolExecutor(10)url_list = ['www.xxxx-{}.com'.format(i) for i in range(15)]for url in url_list:# 在线程池里提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕再将线程还给线程池;如果没有空闲线程则等待。future = pool.submit(task,url)future.add_done_callback(done)

最终统一的效果

import time
import random
from concurrent.futures import ThreadPoolExecutor,Futuredef task(video_url):print('开始执行任务',video_url)time.sleep(2)return random.randint(0,10)if __name__ == '__main__':pool = ThreadPoolExecutor(10)future_list = []url_list = ['www.xxxx-{}.com'.format(i) for i in range(15)]for url in url_list:future = pool.submit(task,url)future_list.append(future)pool.shutdown(True)for fu in future_list:print(fu.result())

线程池下载图片案例(需要自己写一遍)

import os
import requests
from concurrent.futures import ThreadPoolExecutordef download(image_url):res = requests.get(url = image_url,headers = {'User-Agent':"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"})return resdef outer(file_name):def save(response):res = response.result()# 写入本地# 检查images目录是否存在?不存在,则创建一个images目录if not os.path.exists('images'):# 创建images目录os.makedirs('images')file_path = os.path.join('images',file_name)# 将图片写入文件with open(file_path,mode='wb') as img_object:img_object.write(res.content)return saveif __name__ == '__main__':pool = ThreadPoolExecutor(10)with open('mv.csv',mode='r',encoding='utf-8') as file_object:for line in file_object:nid,name,url = line.strip().split(',')file_name = '{}.png'.format(name)fur = pool.submit(download,url)fur.add_done_callback(outer(file_name))

在这里插入图片描述

小黑生活

请添加图片描述

请添加图片描述
请添加图片描述
请添加图片描述
请添加图片描述
请添加图片描述
请添加图片描述
请添加图片描述

明天北马目标:安全回家


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部