1 系统功能概述
爬虫系统包括爬虫任务管理系统和数据爬取系统。
爬虫任务管理系统包括爬虫任务的 crud、爬虫任务执行的启动和停止功能。
数据爬取系统用于数据的爬取和入库。
2 技术实现概述
使用 xxl-job 框架构建爬虫任务管理系统;
使用 pyhon 的 django 框架构建数据爬取系统;
将数据爬取系统注册到 xxl-job 系统中,通过管理定时任务的方式来管理爬虫任务。
3 将数据爬取系统注册到 xxl-job 系统中
3.1 配置
配置xxl-job 服务端信息和 xxl-job 执行器信息。举例如下。
# xxl-job 服务端
# xxl-admin服务端暴露的restful接口url(如http://localhost:8080/xxl-job-admin/api/)
xxl_admin_baseurl: str = 'http://xxl-job-test.com/api/'
# 请求令牌
xxl_job_access_token: str = 'test_token'
# xxl-job 执行器信息
executor_app_name: str = 'test-spider-web'
executor_port: int = 9999
3.2 代码
3.2.1 注册执行器到 xxl-job 服务端
数据爬取系统通过调用 xxl-job 的 registry 接口来注册执行器到 xxl-job 服务端。举例如下所示。
# 项目启动时,异步执行注册执行器到xxl-job服务端
register_async()
# 异步执行注册执行器到xxl-job服务端
def register_async():
logger.info("register_async 注册执行器到xxl-job服务端")
p = pool(1)
p.apply_async(register_node, ())
logger.info("register_async 注册执行器到xxl-job服务端成功")
# 注册执行器到xxl-job服务端
def register_node():
# 必须循环去注册,不然会显示为离线
try:
while true:
registry()
time.sleep(10)
finally:
logger.error("register node is exit.")
# 注册
def registry():
payload = {
"registrygroup": "executor",
"registrykey": settings.executor_app_name,
"registryvalue": executor_baseurl()
}
try:
headers = {"xxl-job-access-token": settings.xxl_job_access_token}
response = post_simple(settings.xxl_admin_baseurl + "registry", payload, headers)
if response.get("code") != 200:
logger.error("registry error. {}".format(str(response)))
return false
return true
except baseexception as e:
logger.error("registry error. {}".format(str(e)))
def executor_baseurl() -> str:
return "http://{host}:{port}".format(host=get_network_ip(), port=settings.executor_port)
3.2.2 接口调用工具类
def post_simple(url, data, headers):
times = 0
while times < settings.request_retry_times:
try:
response = requests.post(url, json=data, headers=headers)
if response.status_code != 200:
logger.error("post 请求失败. url:{}, data:{}. response:{}".format(url, data, response.text))
raise businesserror("请求失败")
else:
return response.json()
except baseexception as e:
times += 1
logger.warning(
"post 请求连接失败. times:{}, retry after:{}, url:{}. data:{}.error:{}"
.format(times, settings.request_retry_interval, url, str(data), str(e))
)
time.sleep(settings.request_retry_interval)
logger.error("post 请求连接失败. url:{}, data:{}".format(url, data))
raise businesserror("请求连接失败")
def get_network_ip() -> str:
"""获取本机地址,会获取首个网络地址"""
_, _, ipaddrlist = socket.gethostbyname_ex(socket.gethostname())
return ipaddrlist[0]
4 定时任务执行
4.1 定时任务的执行流程
xxl-job调度中心通过http的方式,调用执行器的任务。具体流程如下:
- 调度中心将任务调度信息推送给执行器。这些任务调度信息主要包括:任务id、本次调度日志id、本次调度日志时间、任务参数等。
- 执行器在收到调度信息后,启动任务的执行。这一过程在执行器的机器上进行。
- 任务执行完毕后,执行器将执行结果返回给调度中心。这些执行结果主要包括:执行成功或失败、执行日志等。
4.2 触发任务执行
4.2.1 数据爬取系统暴露名称为“run” 的 http 接口
如 http://localhost:9999/run 。 爬取逻辑全部放在 run 接口中。举例如下。
# (1)接口暴露 urls.py 文件
urlpatterns = [
path('run', spider.run, name='run'),
]
# (2)run 接口 spider.py 文件
@api_view(['post'])
def run(request):
"""
爬虫执行入口
"""
logger.info("============ run 执行定时任务 start")
# 入参
request_data = jsonparser().parse(request)
executor_handler = request_data['executorhandler']
executor_params = request_data['executorparams']
executor_params_dict = json.loads(executor_params)
# 任务异步执行
if executor_handler == 'task01':
submit_task(taskservice01.doxxljobtask, (executor_params_dict, request_data))
if executor_handler == 'task02':
submit_task(taskservice02.doxxljobtask, (executor_params_dict, request_data))
logger.info("============ run 执行定时任务 finish")
return httpresponse(json.dumps(dict(code=200, msg='成功')))
# (3)线程池工具类 thread_pool_util.py 文件
import logging
from multiprocessing.dummy import pool
# 通用线程池
pool = pool(10000)
logger = logging.getlogger(__name__)
# 异步执行任务
def submit_task(func, args) -> none:
try:
pool.apply_async(func, args)
except baseexception as e:
logger.error("submit_task:执行失败. error:{}".format(str(e)))
4.2.2 xxl-job 定时调用上述 “run” 接口
xxl-job 调度中心通过http的方式,调用执行器的任务。
具体来说是指 xxl-job 通过调用执行器的 “run” 接口来执行定时任务。
4.3 任务状态上报
任务执行完成后,数据爬取系统通过调用 xxl-job 的 callback 接口来进行定时任务执行状态上报。
# 任务状态上报. code:200-表示任务执行正常,500-表示失败,100-执行中
def callback(log_id, timestamp, code):
if log_id is none or timestamp is none:
return false
payload = [
{
"logid": log_id, # 本次调度日志id
"logdatetim": timestamp, # 本次调度时间
"handlecode": code # 任务状态
}
]
try:
headers = {"xxl-job-access-token": settings.xxl_job_access_token}
response = post_simple(settings.xxl_admin_baseurl + "callback", payload, headers)
if response.get("code") != 200:
logger.error("callback error.log_id:{}.response{}".format(log_id, str(response)))
return false
return true
except baseexception as e:
logger.error("callback error. {}".format(str(e)))
发表评论