Scrapy 源码分析之 RetryMiddleware 模块
共 11264字,需浏览 23分钟
·
2022-08-11 21:32
↑ 关注 + 星标 ,每天学Python新技能
后台回复【大礼包】送你Python自学大礼包
时隔一个多月,scrapy 章节又迎来了重大更新,今天分享的主题是 RetryMiddleware 中间件。文中若有错误内容,欢迎各位读者多多指正。在阅读的同时不要忘记点赞+关注哦⛽️
一、问题思考
二、文档查寻
三、源码分析
四、源码重写
五、总结分享
趣味模块
娜娜是一名爬虫工程师,最近小娜在采集数据过程中遇到了难题。原因是因为任务积压代理超时了,所有的 request 全部无法下载了。娜娜很是苦恼,不知道如何解决这类型问题。后来小娜看了 TheWeiJun 发表的文章,存在的问题立马迎刃而解,接下来,让我们一起去看看他们是怎么做的吧。
一、问题思考
Question
①使用 scrapy 框架时,如果请求失败,如何保证该请求成功率?
Question
②scrapy 的重试机制是否了解,默认是几次?在什么样的情况下触发?
Question
③scrapy 重试机制,重试状态码有哪些,我们是否可以动态定义?
Question
④scrapy 在重试过程中,如何实时更换代理?如何清除失效的代理?
前言:那么带着这些问题,接下来我们对 Scrapy 源码进行分析探索吧,我相信这篇文章会让大家受益匪浅!
二、文档查寻
1、查看官网文档,搜索指定的模块 RetryMiddleware,搜索结果如下:
说明:观察搜索结果,我们发现官方文档中存在对 RetryMiddleware 模块的解释,接下来让我们点进去,一起去看看官方说明吧。
2、点击搜索结果,查看官方对当前模块的说明解释截图如下:
说明:观察上面的截图,我们发现上面提到的问题大家应该已经知道了部分答案吧。但是还是不够清晰,接下来,让我带大家进入源码分析环节一探究竟吧!
三、源码分析
RetryMiddleware 模块源码如下:
def get_retry_request(
request: Request,
*,
spider: Spider,
reason: Union[str, Exception] = 'unspecified',
max_retry_times: Optional[int] = None,
priority_adjust: Optional[int] = None,
logger: Logger = retry_logger,
stats_base_key: str = 'retry',
):
settings = spider.crawler.settings
stats = spider.crawler.stats
retry_times = request.meta.get('retry_times', 0) + 1
if max_retry_times is None:
max_retry_times = request.meta.get('max_retry_times')
if max_retry_times is None:
max_retry_times = settings.getint('RETRY_TIMES')
if retry_times <= max_retry_times:
logger.debug(
"Retrying %(request)s (failed %(retry_times)d times): %(reason)s",
{'request': request, 'retry_times': retry_times, 'reason': reason},
extra={'spider': spider}
)
new_request: Request = request.copy()
new_request.meta['retry_times'] = retry_times
new_request.dont_filter = True
if priority_adjust is None:
priority_adjust = settings.getint('RETRY_PRIORITY_ADJUST')
new_request.priority = request.priority + priority_adjust
if callable(reason):
reason = reason()
if isinstance(reason, Exception):
reason = global_object_name(reason.__class__)
stats.inc_value(f'{stats_base_key}/count')
stats.inc_value(f'{stats_base_key}/reason_count/{reason}')
return new_request
else:
stats.inc_value(f'{stats_base_key}/max_reached')
logger.error(
"Gave up retrying %(request)s (failed %(retry_times)d times): "
"%(reason)s",
{'request': request, 'retry_times': retry_times, 'reason': reason},
extra={'spider': spider},
)
return None
class RetryMiddleware:
# IOError is raised by the HttpCompression middleware when trying to
# decompress an empty response
EXCEPTIONS_TO_RETRY = (defer.TimeoutError, TimeoutError, DNSLookupError,
ConnectionRefusedError, ConnectionDone, ConnectError,
ConnectionLost, TCPTimedOutError, ResponseFailed,
IOError, TunnelError)
def __init__(self, settings):
if not settings.getbool('RETRY_ENABLED'):
raise NotConfigured
self.max_retry_times = settings.getint('RETRY_TIMES')
self.retry_http_codes = set(int(x) for x in settings.getlist('RETRY_HTTP_CODES'))
self.priority_adjust = settings.getint('RETRY_PRIORITY_ADJUST')
def from_crawler(cls, crawler):
return cls(crawler.settings)
def process_response(self, request, response, spider):
if request.meta.get('dont_retry', False):
return response
if response.status in self.retry_http_codes:
reason = response_status_message(response.status)
return self._retry(request, reason, spider) or response
return response
def process_exception(self, request, exception, spider):
if (
isinstance(exception, self.EXCEPTIONS_TO_RETRY)
and not request.meta.get('dont_retry', False)
):
return self._retry(request, exception, spider)
def _retry(self, request, reason, spider):
max_retry_times = request.meta.get('max_retry_times', self.max_retry_times)
priority_adjust = request.meta.get('priority_adjust', self.priority_adjust)
return get_retry_request(
request,
reason=reason,
spider=spider,
max_retry_times=max_retry_times,
priority_adjust=priority_adjust,
)
环节说明:代码一共也就 94 行,但是却能实现多个功能。在好奇心的驱使下,我们还是对源码进行一一讲解分析吧。
from_crawler 函数
# 类方法,创建当前class的实例对象,参数:当前spider settings对象
def from_crawler(cls, crawler):
return cls(crawler.settings)
__init__ 函数
"""
这里涉及到了settings.py配置文件中定义的一些参数。
RETRY_ENABLED: 用于开启中间件,默认为True
RETRY_TIMES: 重试次数, 默认为2
RETRY_HTTP_CODES: 遇到哪些返回状态码需要重试, 一个列表,默认为[500, 503, 504, 400, 408]
RETRY_PRIORITY_ADJUST:调整相对于原始请求的重试请求优先级,默认为-1
"""
def __init__(self, settings):
if not settings.getbool('RETRY_ENABLED'):
raise NotConfigured
self.max_retry_times = settings.getint('RETRY_TIMES')
self.retry_http_codes = set(int(x) for x in settings.getlist('RETRY_HTTP_CODES'))
self.priority_adjust = settings.getint('RETRY_PRIORITY_ADJUST')
process_response 函数
process_exception 函数
EXCEPTIONS_TO_RETRY = (defer.TimeoutError, TimeoutError, DNSLookupError,
ConnectionRefusedError, ConnectionDone, ConnectError,
ConnectionLost, TCPTimedOutError, ResponseFailed,
IOError, TunnelError)
def process_response(self, request, response, spider):
# 处理request请求,确定是否需要请求重试,重试触发机制,前面提到的问题.
if request.meta.get('dont_retry', False):
return response
# 检查response状态码是否在重试机制list中,如果存在就要调用_retry方法进行重试
if response.status in self.retry_http_codes:
reason = response_status_message(response.status)
return self._retry(request, reason, spider) or response
# 不存在会返回response,但会被spider parse方法是过滤掉,只处理200状态码
return response
def process_exception(self, request, exception, spider):
# 如果产生了EXCEPTIONS_TO_RETRY列表中的异常错误并且重试机制为开启状态,则会调用_retry方法进行重试。
if (
isinstance(exception, self.EXCEPTIONS_TO_RETRY)
and not request.meta.get('dont_retry', False)
):
return self._retry(request, exception, spider)
_retry 函数
get_retry_request 函数
# 该方法获取最大重试次数,和请求重试优先级,然后调用get_retry_request方法
def _retry(self, request, reason, spider):
max_retry_times = request.meta.get('max_retry_times', self.max_retry_times)
priority_adjust = request.meta.get('priority_adjust', self.priority_adjust)
return get_retry_request(
request,
reason=reason,
spider=spider,
max_retry_times=max_retry_times,
priority_adjust=priority_adjust,
)
"""
读取当前重试次数和最大重试次数进行比较,
如果小于等于最大重试次数:
利用copy方法在原来的request上复制一个新request,并更新其retry_times,
并将dont_filter设为True来防止因url重复而被过滤。
如果超出最大重试次数:
记录重试失败请求量,并放弃该请求记录到logger日志中,logger级别为:error
"""
def get_retry_request(
request: Request,
*,
spider: Spider,
reason: Union[str, Exception] = 'unspecified',
max_retry_times: Optional[int] = None,
priority_adjust: Optional[int] = None,
logger: Logger = retry_logger,
stats_base_key: str = 'retry',
):
settings = spider.crawler.settings
stats = spider.crawler.stats
retry_times = request.meta.get('retry_times', 0) + 1
if max_retry_times is None:
max_retry_times = request.meta.get('max_retry_times')
if max_retry_times is None:
max_retry_times = settings.getint('RETRY_TIMES')
if retry_times <= max_retry_times:
logger.debug(
"Retrying %(request)s (failed %(retry_times)d times): %(reason)s",
{'request': request, 'retry_times': retry_times, 'reason': reason},
extra={'spider': spider}
)
new_request: Request = request.copy()
new_request.meta['retry_times'] = retry_times
new_request.dont_filter = True
if priority_adjust is None:
priority_adjust = settings.getint('RETRY_PRIORITY_ADJUST')
new_request.priority = request.priority + priority_adjust
if callable(reason):
reason = reason()
if isinstance(reason, Exception):
reason = global_object_name(reason.__class__)
stats.inc_value(f'{stats_base_key}/count')
stats.inc_value(f'{stats_base_key}/reason_count/{reason}')
return new_request
else:
stats.inc_value(f'{stats_base_key}/max_reached')
logger.error(
"Gave up retrying %(request)s (failed %(retry_times)d times): "
"%(reason)s",
{'request': request, 'retry_times': retry_times, 'reason': reason},
extra={'spider': spider},
)
return None
环节总结:整个源码分析流程到这里就结束了,接下来我们一起进入源码重写环节来解决下娜娜遇到的问题吧,我相信大家会豁然开朗的。
四、源码重写
重写 RetryMiddleware 源码后完整代码如下:
class RetryMiddleware:
EXCEPTIONS_TO_RETRY = (defer.TimeoutError, TimeoutError, DNSLookupError,
ConnectionRefusedError, ConnectionDone, ConnectError,
ConnectionLost, TCPTimedOutError, ResponseFailed,
IOError, TunnelError)
def __init__(self, settings):
if not settings.getbool('RETRY_ENABLED'):
raise NotConfigured
self.max_retry_times = settings.getint('RETRY_TIMES')
self.retry_http_codes = set(int(x) for x in settings.getlist('RETRY_HTTP_CODES'))
self.priority_adjust = settings.getint('RETRY_PRIORITY_ADJUST')
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def process_response(self, request, response, spider):
if request.meta.get('dont_retry', False):
return response
if response.status in self.retry_http_codes: # 可以自定义重试状态码
reason = response_status_message(response.status)
response.last_content = request.meta
return self._retry(request, reason, spider) or response
return response
def process_exception(self, request, exception, spider):
if (
isinstance(exception, self.EXCEPTIONS_TO_RETRY)
and not request.meta.get('dont_retry', False)
):
return self._retry(request, exception, spider)
def _retry(self, request, reason, spider):
max_retry_times = request.meta.get('max_retry_times', self.max_retry_times)
priority_adjust = request.meta.get('priority_adjust', self.priority_adjust)
request.meta['proxy'] = "xxx:xxxx"
request.headers['Proxy-Authorization'] = "proxyauth"
return get_retry_request(
request,
reason=reason,
spider=spider,
max_retry_times=max_retry_times,
priority_adjust=priority_adjust,
)
重写总结:我们只需要在 _retry 函数中实时更换代理即可,如果涉及到代理池需要剔除失败代理的问题,同样在 _retry 函数中删除代理池中指定代理即可。我们还可以自定义重试机制状态码,大家可自行添加即可!
五、总结分享
通过本次案例分析,上面的几个问题我们都已经得到了答案。今天分享到这里就结束了,欢迎大家关注下期文章,我们不见不散⛽️。最后希望大家多多转发、点赞、在看支持一波