Scrapy源码剖析:Scrapy是如何运行起来的?
阅读本文大约需要 15 分钟。 本文章代码较多,如果手机端阅读体验不好,建议先收藏后在 PC 端阅读。
scrapy 命令从哪来?
scrapy crawl
scrapy
命令从何而来?$ which scrapy
/usr/local/bin/scrapy
import re
import sys
from scrapy.cmdline import execute
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(execute())
setup.py
中,我们找到这个文件,就会发现在这个文件里,已经声明好了程序的运行入口处:from os.path import dirname, join
from setuptools import setup, find_packages
setup(
name='Scrapy',
version=version,
url='http://scrapy.org',
...
entry_points={ # 运行入口在这里:scrapy.cmdline:execute
'console_scripts': ['scrapy = scrapy.cmdline:execute']
},
classifiers=[
...
],
install_requires=[
...
],
)
entry_points
配置,它就是调用 Scrapy 开始的地方,也就是cmdline.py
的 execute
方法。setuptools
这个包管理工具,就会把上述代码生成好并放在可执行路径下,这样当我们调用 scrapy
命令时,就会调用 Scrapy 模块下的 cmdline.py
的 execute
方法。编写一个带有 main
方法的 Python 模块(首行必须注明 Python 执行路径)去掉 .py
后缀名修改权限为可执行( chmod +x
文件名)直接用文件名就可以执行这个 Python 文件
mycmd
,在这个文件中编写一个 main
方法,这个方法编写我们想要的执行的逻辑,之后执行 chmod +x mycmd
把这个文件权限变成可执行,最后通过 ./mycmd
就可以执行这段代码了,而不再需要通过 python
方式就可以执行了,是不是很简单?运行入口(execute.py)
scrapy/cmdline.py
的 execute
方法,那我们就看一下这个方法。def execute(argv=None, settings=None):
if argv is None:
argv = sys.argv
# --- 兼容低版本scrapy.conf.settings的配置 ---
if settings is None and 'scrapy.conf' in sys.modules:
from scrapy import conf
if hasattr(conf, 'settings'):
settings = conf.settings
# -----------------------------------------
# 初始化环境、获取项目配置参数 返回settings对象
if settings is None:
settings = get_project_settings()
# 校验弃用的配置项
check_deprecated_settings(settings)
# --- 兼容低版本scrapy.conf.settings的配置 ---
import warnings
from scrapy.exceptions import ScrapyDeprecationWarning
with warnings.catch_warnings():
warnings.simplefilter("ignore", ScrapyDeprecationWarning)
from scrapy import conf
conf.settings = settings
# ---------------------------------------
# 执行环境是否在项目中 主要检查scrapy.cfg配置文件是否存在
inproject = inside_project()
# 读取commands文件夹 把所有的命令类转换为{cmd_name: cmd_instance}的字典
cmds = _get_commands_dict(settings, inproject)
# 从命令行解析出执行的是哪个命令
cmdname = _pop_command_name(argv)
parser = optparse.OptionParser(formatter=optparse.TitledHelpFormatter(), \
conflict_handler='resolve')
if not cmdname:
_print_commands(settings, inproject)
sys.exit(0)
elif cmdname not in cmds:
_print_unknown_command(settings, cmdname, inproject)
sys.exit(2)
# 根据命令名称找到对应的命令实例
cmd = cmds[cmdname]
parser.usage = "scrapy %s %s" % (cmdname, cmd.syntax())
parser.description = cmd.long_desc()
# 设置项目配置和级别为command
settings.setdict(cmd.default_settings, priority='command')
cmd.settings = settings
# 添加解析规则
cmd.add_options(parser)
# 解析命令参数,并交由Scrapy命令实例处理
opts, args = parser.parse_args(args=argv[1:])
_run_print_help(parser, cmd.process_options, args, opts)
# 初始化CrawlerProcess实例 并给命令实例添加crawler_process属性
cmd.crawler_process = CrawlerProcess(settings)
# 执行命令实例的run方法
_run_print_help(parser, _run_command, cmd, args, opts)
sys.exit(cmd.exitcode)
初始化项目配置
scrapy.cfg
有关,通过调用 get_project_settings
方法,最终生成一个 Settings
实例。def get_project_settings():
# 环境变量中是否有SCRAPY_SETTINGS_MODULE配置
if ENVVAR not in os.environ:
project = os.environ.get('SCRAPY_PROJECT', 'default')
# 初始化环境 找到用户配置文件settings.py 设置到环境变量SCRAPY_SETTINGS_MODULE中
init_env(project)
# 加载默认配置文件default_settings.py 生成settings实例
settings = Settings()
# 取得用户配置文件
settings_module_path = os.environ.get(ENVVAR)
# 如果有用户配置 则覆盖默认配置
if settings_module_path:
settings.setmodule(settings_module_path, priority='project')
# 如果环境变量中有其他scrapy相关配置也覆盖
pickled_settings = os.environ.get("SCRAPY_PICKLED_SETTINGS_TO_OVERRIDE")
if pickled_settings:
settings.setdict(pickle.loads(pickled_settings), priority='project')
env_overrides = {k[7:]: v for k, v in os.environ.items() if
k.startswith('SCRAPY_')}
if env_overrides:
settings.setdict(env_overrides, priority='project')
return settings
default_settings.py
,主要逻辑在 Settings
类中。class Settings(BaseSettings):
def __init__(self, values=None, priority='project'):
# 调用父类构造初始化
super(Settings, self).__init__()
# 把default_settings.py的所有配置set到settings实例中
self.setmodule(default_settings, 'default')
# 把attributes属性也set到settings实例中
for name, val in six.iteritems(self):
if isinstance(val, dict):
self.set(name, BaseSettings(val, 'default'), 'default')
self.update(values, priority)
default_settings.py
中的所有配置项设置到 Settings
中,而且这个配置是有优先级的。default_settings.py
是非常重要的,我们读源码时有必要重点关注一下里面的内容,这里包含了所有组件的默认配置,以及每个组件的类模块,例如调度器类、爬虫中间件类、下载器中间件类、下载处理器类等等。# 下载器类
DOWNLOADER = 'scrapy.core.downloader.Downloader'
# 调度器类
CHEDULER = 'scrapy.core.scheduler.Scheduler'
# 调度队列类
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'
SCHEDULER_PRIORITY_QUEUE = 'scrapy.pqueues.ScrapyPriorityQueue'
检查运行环境是否在项目中
scrapy
命令有的是依赖项目运行的,有的命令则是全局的。这里主要通过就近查找 scrapy.cfg
文件来确定是否在项目环境中,主要逻辑在 inside_project
方法中。def inside_project():
# 检查此环境变量是否存在(上面已设置)
scrapy_module = os.environ.get('SCRAPY_SETTINGS_MODULE')
if scrapy_module is not None:
try:
import_module(scrapy_module)
except ImportError as exc:
warnings.warn("Cannot import scrapy settings module %s: %s" % (scrapy_module, exc))
else:
return True
# 如果环境变量没有 就近查找scrapy.cfg 找得到就认为是在项目环境中
return bool(closest_scrapy_cfg())
scrapy.cfg
文件,如果能找到,则说明是在爬虫项目中,否则就认为是执行的全局命令。组装命令实例集合
scrapy
包括很多命令,例如 scrapy crawl
、 scrapy fetch
等等,那这些命令是从哪来的?答案就在 _get_commands_dict
方法中。def _get_commands_dict(settings, inproject):
# 导入commands文件夹下的所有模块 生成{cmd_name: cmd}的字典集合
cmds = _get_commands_from_module('scrapy.commands', inproject)
cmds.update(_get_commands_from_entry_points(inproject))
# 如果用户自定义配置文件中有COMMANDS_MODULE配置 则加载自定义的命令类
cmds_module = settings['COMMANDS_MODULE']
if cmds_module:
cmds.update(_get_commands_from_module(cmds_module, inproject))
return cmds
def _get_commands_from_module(module, inproject):
d = {}
# 找到这个模块下所有的命令类(ScrapyCommand子类)
for cmd in _iter_command_classes(module):
if inproject or not cmd.requires_project:
# 生成{cmd_name: cmd}字典
cmdname = cmd.__module__.split('.')[-1]
d[cmdname] = cmd()
return d
def _iter_command_classes(module_name):
# 迭代这个包下的所有模块 找到ScrapyCommand的子类
for module in walk_modules(module_name):
for obj in vars(module).values():
if inspect.isclass(obj) and \
issubclass(obj, ScrapyCommand) and \
obj.__module__ == module.__name__:
yield obj
commands
文件夹下的所有模块,最终生成一个 {cmd_name: cmd}
字典集合,如果用户在配置文件中也配置了自定义的命令类,也会追加进去。也就是说,我们自己也可以编写自己的命令类,然后追加到配置文件中,之后就可以使用自己定义的命令了。解析命令
def _pop_command_name(argv):
i = 0
for arg in argv[1:]:
if not arg.startswith('-'):
del argv[i]
return arg
i += 1
scrapy crawl
,这个方法会解析出 crawl
,通过上面生成好的命令类的字典集合,就能找到 commands
目录下的 crawl.py
文件,最终执行的就是它的 Command
类。解析命令行参数
cmd.process_options
方法解析我们的参数:def process_options(self, args, opts):
# 首先调用了父类的process_options 解析统一固定的参数
ScrapyCommand.process_options(self, args, opts)
try:
# 命令行参数转为字典
opts.spargs = arglist_to_dict(opts.spargs)
except ValueError:
raise UsageError("Invalid -a value, use -a NAME=VALUE", print_help=False)
if opts.output:
if opts.output == '-':
self.settings.set('FEED_URI', 'stdout:', priority='cmdline')
else:
self.settings.set('FEED_URI', opts.output, priority='cmdline')
feed_exporters = without_none_values(
self.settings.getwithbase('FEED_EXPORTERS'))
valid_output_formats = feed_exporters.keys()
if not opts.output_format:
opts.output_format = os.path.splitext(opts.output)[1].replace(".", "")
if opts.output_format not in valid_output_formats:
raise UsageError("Unrecognized output format '%s', set one"
" using the '-t' switch or as a file extension"
" from the supported list %s" % (opts.output_format,
tuple(valid_output_formats)))
self.settings.set('FEED_FORMAT', opts.output_format, priority='cmdline')
初始化CrawlerProcess
CrawlerProcess
实例,然后运行对应命令实例的 run
方法。cmd.crawler_process = CrawlerProcess(settings)
_run_print_help(parser, _run_command, cmd, args, opts)
scrapy crawl
,也就是说最终调用的是 commands/crawl.py
的 run
方法:def run(self, args, opts):
if len(args) < 1:
raise UsageError()
elif len(args) > 1:
raise UsageError("running 'scrapy crawl' with more than one spider is no longer supported")
spname = args[0]
self.crawler_process.crawl(spname, **opts.spargs)
self.crawler_process.start()
run
方法中调用了 CrawlerProcess
实例的 crawl
和 start
方法,就这样整个爬虫程序就会运行起来了。CrawlerProcess
初始化:class CrawlerProcess(CrawlerRunner):
def __init__(self, settings=None):
# 调用父类初始化
super(CrawlerProcess, self).__init__(settings)
# 信号和log初始化
install_shutdown_handlers(self._signal_shutdown)
configure_logging(self.settings)
log_scrapy_info(self.settings)
CrawlerRunner
的构造方法:class CrawlerRunner(object):
def __init__(self, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
# 获取爬虫加载器
self.spider_loader = _get_spider_loader(settings)
self._crawlers = set()
self._active = set()
_get_spider_loader
方法:def _get_spider_loader(settings):
# 读取配置文件中的SPIDER_MANAGER_CLASS配置项
if settings.get('SPIDER_MANAGER_CLASS'):
warnings.warn(
'SPIDER_MANAGER_CLASS option is deprecated. '
'Please use SPIDER_LOADER_CLASS.',
category=ScrapyDeprecationWarning, stacklevel=2
)
cls_path = settings.get('SPIDER_MANAGER_CLASS',
settings.get('SPIDER_LOADER_CLASS'))
loader_cls = load_object(cls_path)
try:
verifyClass(ISpiderLoader, loader_cls)
except DoesNotImplement:
warnings.warn(
'SPIDER_LOADER_CLASS (previously named SPIDER_MANAGER_CLASS) does '
'not fully implement scrapy.interfaces.ISpiderLoader interface. '
'Please add all missing methods to avoid unexpected runtime errors.',
category=ScrapyDeprecationWarning, stacklevel=2
)
return loader_cls.from_settings(settings.frozencopy())
spider_loader
项,默认配置是 spiderloader.SpiderLoader
类,从名字我们也能看出来,这个类是用来加载我们编写好的爬虫类的,下面看一下这个类的具体实现。@implementer(ISpiderLoader)
class SpiderLoader(object):
def __init__(self, settings):
# 配置文件获取存放爬虫脚本的路径
self.spider_modules = settings.getlist('SPIDER_MODULES')
self._spiders = {}
# 加载所有爬虫
self._load_all_spiders()
def _load_spiders(self, module):
# 组装成{spider_name: spider_cls}的字典
for spcls in iter_spider_classes(module):
self._spiders[spcls.name] = spcls
def _load_all_spiders(self):
for name in self.spider_modules:
for module in walk_modules(name):
self._load_spiders(module)
{spider_name: spider_cls}
的字典,所以我们在执行 scarpy crawl
时,Scrapy 就能找到我们的爬虫类。运行爬虫
CrawlerProcess
初始化完之后,调用它的 crawl
方法:def crawl(self, crawler_or_spidercls, *args, **kwargs):
# 创建crawler
crawler = self.create_crawler(crawler_or_spidercls)
return self._crawl(crawler, *args, **kwargs)
def _crawl(self, crawler, *args, **kwargs):
self.crawlers.add(crawler)
# 调用Crawler的crawl方法
d = crawler.crawl(*args, **kwargs)
self._active.add(d)
def _done(result):
self.crawlers.discard(crawler)
self._active.discard(d)
return result
return d.addBoth(_done)
def create_crawler(self, crawler_or_spidercls):
if isinstance(crawler_or_spidercls, Crawler):
return crawler_or_spidercls
return self._create_crawler(crawler_or_spidercls)
def _create_crawler(self, spidercls):
# 如果是字符串 则从spider_loader中加载这个爬虫类
if isinstance(spidercls, six.string_types):
spidercls = self.spider_loader.load(spidercls)
# 否则创建Crawler
return Crawler(spidercls, self.settings)
Cralwer
实例,然后调用它的 crawl
方法:@defer.inlineCallbacks
def crawl(self, *args, **kwargs):
assert not self.crawling, "Crawling already taking place"
self.crawling = True
try:
# 到现在 才是实例化一个爬虫实例
self.spider = self._create_spider(*args, **kwargs)
# 创建引擎
self.engine = self._create_engine()
# 调用爬虫类的start_requests方法
start_requests = iter(self.spider.start_requests())
# 执行引擎的open_spider 并传入爬虫实例和初始请求
yield self.engine.open_spider(self.spider, start_requests)
yield defer.maybeDeferred(self.engine.start)
except Exception:
if six.PY2:
exc_info = sys.exc_info()
self.crawling = False
if self.engine is not None:
yield self.engine.close()
if six.PY2:
six.reraise(*exc_info)
raise
def _create_spider(self, *args, **kwargs):
return self.spidercls.from_crawler(self, *args, **kwargs)
start_requests
方法获取种子 URL,最后交给引擎执行。Cralwer
是如何开始运行的额,也就是它的 start
方法:def start(self, stop_after_crawl=True):
if stop_after_crawl:
d = self.join()
if d.called:
return
d.addBoth(self._stop_reactor)
reactor.installResolver(self._get_dns_resolver())
# 配置reactor的池子大小(可修改REACTOR_THREADPOOL_MAXSIZE调整)
tp = reactor.getThreadPool()
tp.adjustPoolsize(maxthreads=self.settings.getint('REACTOR_THREADPOOL_MAXSIZE'))
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
# 开始执行
reactor.run(installSignalHandlers=False)
reactor
的模块。reactor
是个什么东西呢?它是 Twisted
模块的事件管理器,我们只要把需要执行的事件注册到 reactor
中,然后调用它的 run
方法,它就会帮我们执行注册好的事件,如果遇到网络IO等待,它会自动帮切换到可执行的事件上,非常高效。reactor
是如何工作的,你可以把它想象成一个线程池,只是采用注册回调的方式来执行事件。ExecuteEngine
处理了,引擎会协调多个组件,相互配合完成整个任务的执行。总结
更多阅读
特别推荐
点击下方阅读原文加入社区会员
评论