手把手教你在Windows下设置分布式队列Celery的心跳轮询

点击上方“Python爬虫与数据挖掘”,进行关注

回复“书籍”即可获赠Python从入门到进阶共10本电子书

终是圣明天子事,景阳宫井又何人。

/1 前言/

    大家好,我是吴老板。用Celery 官方的话来说,Celery 是一个非常优秀的分布式队列,可应用于分布式共享中间队列和定时任务等等。

/2 版本的差异/

    Celery 有很多个版本,各版本之间的差异可谓不小,比如最新的 Celery6.0 版本在稳定性远不如 Celery4.0,所以在使用不同版本的时候,系统给到我们的反馈可能并不能如我们所愿。

/3 服务/

     在 windows 下挂在 Celery 服务有时候会出现不稳定的情况(unix中暂时未发现这种情况),比如在执行定时任务的时候,过了一段时间之后,Celery 出现了假死状态,以至于不能按照我们指定的时间点去执行任务。

这些任务只是加入到待运行队列中(堆积在 Redis 中),只能人为重启 Celery 服务之后才能将堆积的任务释放出来运行。

这样一来,第一是定时任务在指定时间点没有正常运行,其二是在其他时间运行了这些任务,很可能会产生更新数据不及时,时间节点混乱的问题,不仅达不到业务需求,还会反受其害。

/4 设置心跳/

    为了解决 Celerywindows 中的这种弊端,可以为 Celery 任务队列设置一个心跳时间,比如每一分钟或者每五分钟向 Redis 数据库发送一次数据以保证队列始终是活跃的状态,这样只要你的电脑不关机并保持网络畅通(如果是远程 Redis),Celery 任务队列服务就不会出现假死状态。

/5 举个栗子/

    我总是很喜欢用示例来说话,前些时间在对某平台的商家后台进行数据采集的时候,为了使用时能自动获取该网站的 cookie

    用Pyppeteer 写了一个自动化登陆的脚本,和往常一样仍在 Celery 队列中并迅速的启动服务。

    脚本是这样的(非常接近实际的伪代码,没办法,保命要紧)

# -*- coding: utf-8 -*-
from db.redisCurd import RedisQueue
import asyncio
import random
import tkinter
from pyppeteer.launcher import launch
from platLogin.config import USERNAME, PASSWORD, LOGIN_URLclass Login():def __init__(self, shopId):self.shopId = shopIdself.RedisQueue = RedisQueue("cookie")def screen_size(self):tk = tkinter.Tk()width = tk.winfo_screenwidth()height = tk.winfo_screenheight()tk.quit()return {'width': width, 'height': height}async def login(self, username, password, url):browser = await launch({'headless': False,'dumpio': True},args=['--no-sandbox', '--disable-infobars', '--user-data-dir=./userData'],)page = await browser.newPage()  # 启动新的浏览器页面try:await page.setViewport(viewport=self.screen_size())await page.setJavaScriptEnabled(enabled=True)  # 启用jsawait page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')await self.page_evaluate(page)await page.goto(url)await asyncio.sleep(2)# 输入用户名,密码await page.evaluate(f'document.querySelector("#userName").value=""')await page.type('#userName', username, {'delay': self.input_time_random() - 50})  # delay是限制输入的时间await page.evaluate('document.querySelector("#passWord").value=""')await page.type('#passWord', password, {'delay': self.input_time_random()})await page.waitFor(6000)loginImgVcode = await page.waitForSelector('#checkCode')  await loginImgVcode.screenshot({'path': './loginImg.png'})await page.waitFor(6000)res = use_cjy("./loginImg.png")pic_str = res.get("pic_str") if res.get("err_str") == "OK" else "1234"await page.waitFor(6000)await page.type('#checkWord', pic_str, {'delay': self.input_time_random() - 50})await page.waitFor(6000)await page.click('#subMit')await page.waitFor(6000)await asyncio.sleep(2)await self.get_cookie(page)await page.waitFor(3000)await self.page_close(browser)return {'code': 200, 'msg': '登陆成功'}except:return {'code': -1, 'msg': '出错'}finally:await page.waitFor(3000)await self.page_close(browser)# 获取登录后cookieasync def get_cookie(self, page):cookies_list = await page.cookies()cookies = ''for cookie in cookies_list:str_cookie = '{0}={1}; 'str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))cookies += str_cookie# 将cookie 放入 cookie 池self.RedisQueue.put_hash(self.shopId, cookies)return cookiesasync def page_evaluate(self, page):await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {},  }; }''')await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')await page.waitFor(3000)async def page_close(self, browser):for _page in await browser.pages():await _page.close()await browser.close()def input_time_random(self):return random.randint(100, 151)def run(self, username=USERNAME, password=PASSWORD, url=LOGIN_URL):loop = asyncio.get_event_loop()i_future = asyncio.ensure_future(self.login(username, password, url))loop.run_until_complete(i_future)return i_future.result()if __name__ == '__main__':Z = Login(shopId="001")Z.run()

Celery 任务文件是这样的

# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
import sys
import time
from db.redisCurd import RedisQueue
from send_msg.weinxin import Send_msg
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_dir)
from logger.logger import log_v
from celery import Task
from platLogin.login import Login  # 登陆类
from celery import CeleryrandomQueue = RedisQueue("cookie")celery_app = Celery('task')
celery_app.config_from_object('celeryConfig')S = Send_msg()dl_dict = {'demo': {'cookie': '','loginClass': 'Login',}
}# todo 这是三种运行的状态
class task_status(Task):def on_success(self, retval, task_id, args, kwargs): log_v.info('任务信息 -> id:{} , arg:{} , successful ..... Done'.format(task_id, args))def on_failure(self, exc, task_id, args, kwargs, einfo):  log_v.error('task id:{} , arg:{} , failed ! error : {}'.format(task_id, args, exc))def on_retry(self, exc, task_id, args, kwargs, einfo): log_v.warning('task id:{} , arg:{} , retry !  info: {}'.format(task_id, args, exc))# todo 随便找个hash key作为轮询对象, celery在win10系统可能不太稳定,有时候会有连接断开的情况
@celery_app.task(base=task_status)
def get_cookie_status(platName="demo"):try:# log_v.debug(f'[+] 轮询 {platName} 定时器启动 ..... Done')randomQueue.get_hash(platName).decode()log_v.debug(f'[+] 轮询 {platName} 成功 ..... Done')return "Erp 轮询成功"except:return "Erp 轮询失败"@celery_app.task(base=task_status)
def set_plat_cookie(platName="demo", shopId=None):log_v.debug(f"[+] {platName} 正在登陆")core = eval(dl_dict[platName]['loginClass'])(shopId=shopId)result = core.run()return result

Celery 配置文件是这样的

from __future__ import absolute_import
import datetime
from kombu import Exchange, Queue
from celery.schedules import crontab
from urllib import parseBROKER_URL = f'redis://root:{parse.quote("你的不规则密码")}@主机:6379/15'# 导入任务,如tasks.py
CELERY_IMPORTS = ('monitor.tasks',)# 列化任务载荷的默认的序列化方式
CELERY_TASK_SERIALIZER = 'json'# 结果序列化方式
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']CELERY_TIMEZONE = 'Asia/Shanghai'  # 指定时区,不指定默认为 'UTC'
# CELERY_TIMEZONE='UTC'CELERYBEAT_SCHEDULE = {'add-every-60-seconds': {'task': 'tasks.get_cookie_status','schedule': datetime.timedelta(minutes=1),  # 每 1 分钟执行一次'args': ()  # 任务函数参数},
}

启动服务

celery -A tasks beat -l INFO
celery -A tasks worker -l INFO -c 2

    以 2 个线程启动消费者队列服务并启用定时任务,当发现当前平台的 cookie 不可用时,我会向 Celery 发送一个信号(就是调用了前面的set_plat_cookie 这个方法),消费者得到这个任务这个就会执行自动化脚本以获取 cookie 并储存在 Redis 中,使用时在从 Redis 中获取就能正常请求到该平台的数据。

    在空闲时间,Celery中的 get_cookie_status 方法会每隔一分钟向 Redis 请求数据,这就是我们设置的 1分钟心跳。

    这样不管我们的 Celery 是否是后台启动,都不会出现假死、卡死的状态,则万事大吉矣!!

/6 总结/

    本文为了解决 Celerywindows 中的这种弊端,为 Celery 任务队列设置一个心跳时间,比如每一分钟或者每五分钟向 Redis 数据库发送一次数据以保证队列始终是活跃的状态,这样只要你的电脑不关机并保持网络畅通(如果是远程 Redis),Celery 任务队列服务都不会出现假死、卡死的状态。

------------------- End -------------------

往期精彩文章推荐:

  • 手把手教你利用Python轻松拆分Excel为多个CSV文件

  • Python项目实战——手把手教你使用Django框架实现支付宝付款

  • 手把手教你用Python爬取百度搜索结果并保存

欢迎大家点赞,留言,转发,转载,感谢大家的相伴与支持

想加入Python学习群请在后台回复【入群

万水千山总是情,点个【在看】行不行

/今日留言主题/

随便说一两句吧~


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部