Python实现多线程、多进程爬虫
1.多线程的线程守护
在python3中,主线程主进程结束,子进程不会结束,为了能够让主线程回收子线程,可以把子线程设置为守护线程,即该线程不重要,主线程结束,子线程结束:
举个例子:
import time
import threadingdef test():while True:print('测试线程守护!!',threading.currentThread())time.sleep(1)if __name__ == '__main__':t1 = threading.Thread(target=test)t2 = threading.Thread(target=test)t1.start()t2.start()
执行以上代码,得出的结果是交替打印出两个线程,由于线程执行的函数为死循环,所以子线程也会陷入死循环,不管你怎么按ctrl+c也无法停止线程。如下:
(venv) E:\test>python test.py
测试线程守护!! <Thread(Thread-1, started 9652)>
测试线程守护!! <Thread(Thread-2, started 2728)>
测试线程守护!! <Thread(Thread-1, started 9652)>
测试线程守护!! <Thread(Thread-2, started 2728)>
测试线程守护!! <Thread(Thread-1, started 9652)>
测试线程守护!! <Thread(Thread-2, started 2728)>
测试线程守护!! <Thread(Thread-1, started 9652)>
测试线程守护!! <Thread(Thread-2, started 2728)>
测试线程守护!! <Thread(Thread-1, started 9652)>
测试线程守护!! <Thread(Thread-2, started 2728)>
测试线程守护!! <Thread(Thread-1, started 9652)>
测试线程守护!! <Thread(Thread-2, started 2728)>
测试线程守护!! <Thread(Thread-1, started 9652)>
测试线程守护!! <Thread(Thread-2, started 2728)>
那么怎样才能可以避免这种问题呢?或者说,怎样才能在主线程退出的时候,子线程也自动退出呢?没错就是使用线程守护,也就是等到主线程执行完,子线程也强制停止,换句话说就是子线程会随着主线程一起结束。使用案例:
t1 = threading.Thread(targe=func,args=(,))
t1.setDaemon(True)
t1.start() #此时线程才会启动
优化前面死循环不能停止的线程,源码如下:
import time
import threadingdef test():while True:print('测试线程守护!!',threading.currentThread())time.sleep(1)if __name__ == '__main__':t1 = threading.Thread(target=test)t2 = threading.Thread(target=test)t1.setDaemon(True)t1.start()t2.setDaemon(True)t2.start()
输出结果:
(venv) E:\test>python test.py
测试线程守护!! <Thread(Thread-1, started daemon 5884)>
测试线程守护!! <Thread(Thread-2, started daemon 7112)>
可以看到,运行一次就直接退出了,因为主线程已经执行完了,确实是主线程已经结束了,正因为设置了守护线程,所以这时候子线程也一并退出了。
2.队列模块的使用
from queue import Queue
q = Queue(maxsize=100)
item = {}
q.put_nowait(item) #不等待直接放,队列满的时候会报错
q.put(item) #放入数据,队列满的时候会等待
q.get_nowait() #不等待直接取,队列空的时候会报错
q.get() #取出数据,队列为空的时候会等待
q.qsize() #获取队列中现存数据的个数
q.join() #队列中维持了一个计数,计数不为0时候让主线程阻塞等待,队列计数为0的时候才会继续往后执行
q.task_done() # put的时候计数+1,get不会-1,get需要和task_done 一起使用才会-1
3. 线程中使用队列
## 队列可用于线程间的数据通讯
from queue import Queue
import threadingq = Queue()
def add_to_queue():for i in range(0, 100):print("存入队列: {}".format(i))q.put(i)def get_from_queue():# 但是在我们获取队列元素的时候, 我们并不知道队列中放了几个元素,# 这个时候我们就会使用while的死循环来获取,知道取完为止# for i in range(0, 100):while True:print("从队列中取出: {}".format(q.get()))q.task_done()# 创建线程
t = threading.Thread(target=add_to_queue)
# 设置为守护线程
t.setDaemon(True)
# 启动线程
t.start()t = threading.Thread(target=get_from_queue)
t.setDaemon(True)
t.start()# 队列加入主线线程, 等待队列中任务完成为止
q.join()
4. 装饰器的基本使用
装饰器的作用: 不改变原有函数,给函数添加额外功能
# 定义装饰一个死循环执行任务的装饰器
def run_forever(func):def forever(obj):while True:func(obj)return forever# 使用装饰器
@run_forever # 等同于 run_forever(run)
def run(obj):print("人生苦短,我用Python {}".format(obj))# 执行函数 --死循环
run('Hpayy')
糗事百科多线程
实现步骤:
- 1.在init方法中, 创建 URL队列, 响应队列, 数据队列
- 2.在生成URL列表中方法中,把URL添加URL队列中
- 3.在请求页面的方法中,从URL队列中取出URL执行,把获取到的响应数据添加响应队列中
- 4.在处理数据的方法中,从响应队列中取出页面内容进行解析, 把解析结果存储数据队列中
- 5.在保存数据的方法中, 从数据队列中取出数据,进行保存
- 6.开启几个线程来执行上面的方法
注意: 开的线程数取决于这个任务耗时, 耗时长的就多开几个线程.
多线程爬虫源码:
import threading
import requests
from lxml import etree
import json
from queue import Queue# 多线程思路:
# 把所有待采集得链接url放进一个url队列里面
# 在请求页面得过程中,以多线程得方式从队列中取出url,请求结束后要执行task_done()方法来释放队列避免重复采集
# 为了更快得处理相应后数据(页面数据),也可以把每个页面获得得页面源码放在相应队列中,一样处理完得task_done()来清队列
# 最后得保存数据也可以开一个队列用于保存
# 可以设置线程数量来处理数据# 由于请求、处理数据需要较多次数的轮循,所以可以直接写个修饰器函数用于循环某个函数
def run_forever(func):def wrapper(obj):while True:func(obj)return wrapperclass CrawlData:def __init__(self):self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36'}self.url_pattern = 'https://www.qiushibaike.com/8hr/page/{}/'self.url_queue = Queue() # url 队列self.page_queue= Queue() # 页面 队列self.data_queue = Queue() # 数据 队列# 把待采集的url放入队列def add_url_to_queue(self):for i in range(1, 14):self.url_queue.put(self.url_pattern.format(i))@run_foreverdef add_page_to_queue(self):url = self.url_queue.get() # 从队列中获取一个url,get不会做-1操作result = requests.get(url, headers=self.headers)if result.status_code != 200:self.url_queue.put(url)else:self.page_queue.put(result.text)#结束当前url任务self.url_queue.task_done()@run_foreverdef add_data_to_queue(self):# 从队列中获取一个页面数据出来待处理page = self.page_queue.get()element = etree.HTML(page)div_s = element.xpath('//div[@class="recommend-article"]/ul/li')# 遍历分组列表, 再使用xpath获取内容dz_list = []for div in div_s:item = {}# 每个段子包含发送人头像URL, 昵称, 性别, 段子内容, 好笑数,评论数# 头像URLitem['head_url'] = self.get_first_element(div.xpath('.//a[@class="recmd-user"]/img/@src'))if item['head_url'] is not None:item['head_url'] = 'http:' + item['head_url']# 昵称item['author_name'] = self.get_first_element(div.xpath('.//a[@class="recmd-user"]/span/text()'))dz_list.append(item)# 解析好得数据放入队列self.data_queue.put(dz_list)# 结束当前页面数据任务self.page_queue.task_done()def get_first_element(self, list):'''获取列表中第一个元素,如果是空列表就返回None'''return list[0] if len(list) != 0 else None# 保存数据@run_foreverdef save_data(self):# 从已有的数据中获取一个处理后的数据进行存储data = self.data_queue.get()with open('qiubai.json', 'a', encoding='utf8') as f:for dt in data:json.dump(dt, f, ensure_ascii=False)f.write('\n')self.data_queue.task_done() # 结束当前数据任务def run_count_use(self, func, count):# 开启多个线程处理for i in range(count):t = threading.Thread(target=func)t.setDaemon(True)t.start()def run(self):# 把待处理的url加入队列url_t = threading.Thread(target=self.add_url_to_queue)url_t.start()# 开启获取页面数据线程self.run_count_use(self.add_page_to_queue, 3)# 开启处理解析页面数据 self.run_count_use(self.add_data_to_queue, 2)# 开启数据存储线程self.run_count_use(self.save_data, 2)# 使用队列join方法,等待队列数据处理结束self.url_queue.join() # 等待url队列为空时才会往下继续self.page_queue.join() # 等待页面队列为空时才会往下继续self.data_queue.join() # 等待数据队列为空时才会往下继续if __name__ == '__main__':import timestart = time.time()CrawlData().run()end = time.time()print('总共用时:{}'.format((end-start)))
多进程爬取糗百
实现步骤
- 开启一个进程池Pool,一般是基于cpu个数
- 直接遍历循环待采集的url,并使用apply_async开启异步进程
- 按找需求解析源码,获取想要采集的数据
源码:
from multiprocessing import Pool
import json
import requests
import os
import time
from lxml import etreeclass CrawlData:def __init__(self, url):self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36'}self.url = urldef get_page_data(self):print('当前子进程{}'.format(os.getpid()))page_data = requests.get(self.url, headers=self.headers)element = etree.HTML(page_data.text)div_s = element.xpath('//div[@class="recommend-article"]/ul/li')# 遍历分组列表, 再使用xpath获取内容dz_list = []for div in div_s:item = {}item['head_url'] = self.get_first_element(div.xpath('.//a[@class="recmd-user"]/img/@src'))if item['head_url'] is not None:item['head_url'] = 'http:' + item['head_url']# 昵称item['author_name'] = self.get_first_element(div.xpath('.//a[@class="recmd-user"]/span/text()'))dz_list.append(item)self.save_data(dz_list)def get_first_element(self, list):'''获取列表中第一个元素,如果是空列表就返回None'''return list[0] if len(list) != 0 else Nonedef save_data(self, data):# 从已有的数据中获取一个处理后的数据进行存储with open('qiubai_process.json', 'a', encoding='utf8') as f:for dt in data:json.dump(dt, f, ensure_ascii=False)f.write('\n')# 采集入口函数
def crawl(url):CrawlData(url).get_page_data()if __name__ == '__main__':start = time.time()url_pattern = 'https://www.qiushibaike.com/8hr/page/{}/'p = Pool(4) # 不传参默认以最大cpu数作为进程数for i in range(1,14):url = url_pattern.format(str(i))p.apply_async(crawl, args=(url,))p.close()p.join()end = time.time()print(f'总共用时:{(end-start)}')
运行结果:
- 单进程 Pool(1):
当前子进程10356
当前子进程10356
当前子进程10356
当前子进程10356
当前子进程10356
当前子进程10356
当前子进程10356
当前子进程10356
当前子进程10356
当前子进程10356
当前子进程10356
当前子进程10356
当前子进程10356
总共用时:13.325877666473389
- 双进程 Pool(2):
当前子进程3320
当前子进程9492
当前子进程3320
当前子进程9492
当前子进程9492
当前子进程3320
当前子进程9492
当前子进程3320
当前子进程3320
当前子进程9492
当前子进程3320
当前子进程9492
当前子进程3320
总共用时:1.3394155502319336
- 三进程 Pool(3):
当前子进程10908
当前子进程9124
当前子进程3148
当前子进程10908
当前子进程9124
当前子进程3148
当前子进程3148
当前子进程9124
当前子进程10908
当前子进程9124
当前子进程3148
当前子进程10908
当前子进程3148
总共用时:1.1449363231658936
- 四进程 Pool(4):
当前子进程11208
当前子进程7944
当前子进程9188
当前子进程9916
当前子进程9916
当前子进程11208
当前子进程9188
当前子进程7944
当前子进程9916
当前子进程7944
当前子进程11208
当前子进程9188
当前子进程11208
总共用时:0.9803769588470459
下面再来个爬取知乎的例子
采集知乎一定要登录才可以,否则无法正常获取数据,本文采用cookies携带登录信息请求数据,具体之前文章《Python爬虫之利用cookies跳过登陆验证码》已经介绍过。其中cookies一定要用等登陆后从浏览器里面获取,并用于请求中,不然会报以下错误:
{"code":"100010","'message'":"Unsupported auth type oauth"}
多进程采集知乎
from multiprocessing import Pool
import json
import os
import requests
import time
from queue import Queueclass CrawlZH(object):def __init__(self, url, cookies):self.url = urlself.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.129 Safari/537.36'}self.cookies = cookiesdef get_json(self):session = requests.session()session.headers = self.headersrequests.utils.add_dict_to_cookiejar(session.cookies, self.cookies)response = session.get(self.url)result = json.loads(response.text)print("当前子进程-{}运行结果:{}".format(os.getpid(), len(result['data'])))def main(url, cookies):# print('当前子进程:{}'.format(os.getpid()))CrawlZH(url, cookies).get_json()if __name__ == '__main__':# 这里cookies是你正常的登陆后从浏览器里面复制出来cookies = { ...}url_pattern = 'https://www.zhihu.com/api/v3/feed/topstory/recommend?desktop=true&page_number={}&limit=6'# 开启多进程采集 start === print('母进程:{}'.format(os.getpid()))start = time.time()p = Pool()for page_number in range(1, 11):url = url_pattern.format(str(page_number))p.apply_async(main, args=(url, cookies,))p.close()p.join()end = time.time()print(f'总共耗时:{(end-start)}')# 多进程 end ==== # 多线程采集 start ===start = time.time()CrawlZHThread(cookies).run()end = time.time()print(f'总共耗时:{(end-start)}')# 多线程 end ====
结果:
多进程结果:
母进程:5024
当前子进程-6372运行结果:6
当前子进程-12984运行结果:6
当前子进程-4536运行结果:6
当前子进程-12924运行结果:6
当前子进程-12984运行结果:6
当前子进程-6372运行结果:6
当前子进程-12924运行结果:6
当前子进程-4536运行结果:6
当前子进程-12984运行结果:6
当前子进程-6372运行结果:6
总共耗时:2.6678614616394043
多线程采集知乎
# 简单 修饰器 用于无限循环
def run_forever(func):def wrapper(obj):while True:func(obj)return wrapperclass CrawlZHThread(object):def __init__(self, cookies):self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.129 Safari/537.36'}self.cookies = cookiesself.url_pattern = 'https://www.zhihu.com/api/v3/feed/topstory/recommend?desktop=true&page_number={}&limit=6'self.url_queue = Queue()def add_url_to_queue(self):for i in range(1, 11):self.url_queue.put(self.url_pattern.format(str(i)))def get_json(self):while True:url = self.url_queue.get()session = requests.session()session.headers = self.headersrequests.utils.add_dict_to_cookiejar(session.cookies, self.cookies)response = session.get(url)result = json.loads(response.text)print("当前子线程-{}运行结果:{}".format(threading.current_thread().name, len(result['data'])))self.url_queue.task_done()@run_foreverdef get_json_01(self):url = self.url_queue.get()session = requests.session()session.headers = self.headersrequests.utils.add_dict_to_cookiejar(session.cookies, self.cookies)response = session.get(url)result = json.loads(response.text)print("当前子线程-{}运行结果:{}".format(threading.current_thread().name, len(result['data'])))self.url_queue.task_done()def run(self):url_t = threading.Thread(target=self.add_url_to_queue)url_t.start()# 开启多线程for i in range(4):t = threading.Thread(target=self.get_json)t.setDaemon(True)t.start()self.url_queue.join()if __name__ == '__main__':# 这里cookies是你正常的登陆后从浏览器里面复制出来cookies = { ...}# 多线程采集 start ===start = time.time()CrawlZHThread(cookies).run()end = time.time()print(f'总共耗时:{(end-start)}')# 多线程 end ====
结果:
多线程结果:
当前子线程-Thread-3运行结果:6
当前子线程-Thread-4运行结果:6
当前子线程-Thread-5运行结果:6
当前子线程-Thread-2运行结果:6
当前子线程-Thread-3运行结果:6
当前子线程-Thread-4运行结果:6
当前子线程-Thread-5运行结果:6
当前子线程-Thread-2运行结果:6
当前子线程-Thread-3运行结果:6
当前子线程-Thread-4运行结果:6
总共耗时:2.4344863891601562
其中get_json与get_json_01结果是一样的,只不过一个用了修饰器,用修饰器可以提高可读性
对比以上两种采集方式,时间差不多,我只做获取页面数据,没有做后续数据处理,所以没有显著差异!!
注意本文的创作时间,后期页面源码调整,可能将采集不到数据,也属正常现象,本文主要目的是提供多进程、多线程爬虫的思想逻辑,仅供参考!!
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
