pip install quart
https://gitlab.com/pgjones/quart/
/gsxt/webapi.py
# THE WINTER IS COMING! the old driver will be driving who was a man of the world!
# -*- coding: utf-8 -*- python 3.6.7, create time is 18-11-9 下午7:27 GMT+8import os
from uuid import uuid1
from base64 import b64encode
from redis import StrictRedis
from quart import (Quart,make_response,request,jsonify,
)import staticredis = StrictRedis(decode_responses=True)
app = Quart(__name__)
app.config['JSON_AS_ASCII'] = False # 让jsonify()返回的json数据以utf8编码方式正常显示中文@app.route('/', methods=['GET'])
async def index():"""首页 接口说明"""headers = {"Content-type": "text/plain; charset=UTF-8"}resp = make_response((static.__doc__, 200, headers))return await resp@app.route('/company', methods=['GET'])
async def company():"""接收任务参数,写入redis"""company_name = request.args.get('company_name', None)if company_name is None:return jsonify({'status': 'failed', 'msg': '/company?company_name=缺少公司名称参数'})token = uuid1()# token = 'captcha_img' # 简单测试用gsxt_task_json = {'company_name': company_name,'crack_captcha_mode': request.args.get('crack_captcha_mode', None) or '0','token': token,'status': 'wait','msg': '新任务等待抓取'}try: # 向redis推任务task_key = '{}:{}'.format(static.GSXT_TASK_TOPIC, token)redis.hmset(task_key, gsxt_task_json)redis.expire(task_key, 600) # 过期时间redis.rpush(static.GSXT_TOKEN, token)redis.expire(static.GSXT_TOKEN, 590)except:gsxt_task_json['msg'] = '接收任务失败,检查redis及配置'return jsonify(gsxt_task_json)gsxt_task_json['msg'] = '成功接收抓取任务'return jsonify(gsxt_task_json)@app.route('/crack_captcha', methods=['GET', 'POST'])
async def crack_captcha():"""GET: 返回手动打码的htmlPOST: 接收手动输入的验证码信息"""if request.method == 'GET':token = request.args.get('token', None)if token is None:return jsonify({'status': 'failed', 'msg': '/crack_captcha?token=缺少token参数'})img_file_path = './images/{}.jpg'.format(token) # 图片路径:以token命名图片# try:with open(img_file_path, 'rb') as f:img_b64_str = b64encode(f.read()).decode() # 为了前端渲染展示:图片二进制字节流转base64字符串# format(IMG_BASE64, CRACK_CAPTCHA_URL, TOKEN)return static.img_html%{'IMG_BASE64':img_b64_str,'CRACK_CAPTCHA_URL':'/crack_captcha','TOKEN':token}# except:# return jsonify({'status': 'failed', 'msg': '没有名为{}.jpg的图片,稍后重试或检查任务是否存在'.format(token)})elif request.method == 'POST':form_data = await request.formcaptcha_params = form_data.get('captcha_params', None)# print(captcha_params)token = form_data.get('token', None)# 将验证坐标存入redistask_key = '{}:{}'.format(static.GSXT_TASK_TOPIC, token)redis.hset(task_key, 'captcha_params', captcha_params)# os.remove('./images/{}.jpg'.format(token)) # 删除图片return jsonify({'token': token,'result_url': '/result?token={}'.format(token),})@app.route('/result')
async def result():"""查询任务结果"""token = request.args.get('token', None)if token is None:return jsonify({'status': 'failed', 'msg': '/crack_captcha?token=缺少token参数'})task_key = '{}:{}'.format(static.GSXT_TASK_TOPIC, token)result_dict = redis.hgetall(task_key)return jsonify(result_dict)if __name__ == '__main__':app.run()
/gsxt/static.py
"""
国家企业公示网实时数据抓取demo组件说明webapi.py 交互演示功能任务参数push到redis消息总线中,数据查询也从redis消息总线中获取GET /接口说明文档GET /company接收任务参数params json = {company_name: 公司名称,crack_captcha_mode: 默认'0'(手动破解); '1'为调用打码平台破解,}return json = {company_name: 公司名称,token: 任务唯一识别码,crack_captcha_url: 手动破解验证的url,}GET /crack_captcha获取验证图片并返回手动破解验证码的htmlparams json = {token: 任务唯一识别码,}return image in HTMLPOST /crack_captcha接收手动输入的验证码信息params json = {token: 任务唯一识别码,captcha_params: 验证码所需参数,}return json = {token: 任务唯一识别码,result_url: 查询抓取结果的url,}GET /resultparams json = {token: 任务唯一识别码,}return gsxt_task:tokenredis 消息总线及缓存gsxt_token = [token1, token2, ...]gsxt_task:token{company_name,crack_captcha_mode,captcha_params,status, # 'wait', 任务没开始# 'crawling', 抓取中# 'failed', 失败# 'done', 完成msg,data,}crawler.py轮询redis抓取数据static.py静态变量配置文件
"""img_html = """
spider_gsxt
X:Y:按顺序依次点击图中的字符,并点击提交
"""# 配置
GSXT_TOKEN = 'gsxt_token'
GSXT_TASK_TOPIC = 'gsxt_task'
/gsxt/crawler.py
import time
import redis
import random
import requests
from PIL import Image # pip install pillow
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChainsfrom static import (GSXT_TOKEN, # redis:listGSXT_TASK_TOPIC, # redis:hashmap
)redis = redis.StrictRedis(decode_responses=True) # 多进程情况下需改为进程中的实例class CrawlerServer():"""任务接收并处理"""def crawl(self):"""此处可以改为多任务并行"""while True:token = redis.lpop(GSXT_TOKEN)if token is not None:task_dict = redis.hgetall('{}:{}'.format(GSXT_TASK_TOPIC, token))print(task_dict)# input('用代理ip了吗?免费(推荐西刺代理,只能浏览器用)的比收费的好使!思考下为啥?继续吗?')# 调用爬虫spider = GsxtJSCrawler(task_dict)spider.run()print('等待任务中...')time.sleep(5)......
通过
psutil os sys
等模块以及shell
命令获取本节点的状态,根据设定的阈值(cpu、带宽、内存等)来决定是否抢夺redis中gsxt_token中的任务token
启动后向消息总线redis中实时注册节点的状态并发送心跳时间戳,用以检测节点的负载以及是否可用
以多进程的方式分别执行不同的token任务
了解 节点任务调度模块的功能和实现