基于Swoole 下的 Hyperf 框架构建爬虫平台-定时任务设置与执行

Nash

共 32427字,需浏览 65分钟

 · 2023-10-10

上期内容,我们已经构建了一个可以执行的命令系统,同时我们规定了命令在平台上开放的参数和选项,今天我们将命令再次封装成为一个可执行的任务,简单来说就是将对应的命令选出来,针对其开放的参数和接口进行选值设定,并调整定时执行的模式和启动周期,在服务运行过程中保持检测和运行即可。

老规矩,直接上实操,下期再来填坑,文章内容全面升级,将采取分段方式讲解整个实操步骤。

1、创建数据库迁移文件,设置数据库结构

执行命令php bin/hyperf.php gen:migration create_spiders_table,创建爬虫任务数据库表的迁移文件,并修改内容:

      
        <?php
      
      
        
          
use Hyperf\Database\Schema\Schema; use Hyperf\Database\Schema\Blueprint; use Hyperf\Database\Migrations\Migration;
class CreateSpidersTable extends Migration {     /** * Run the migrations. */     public function up(): void { Schema::create('spiders', function (Blueprint $table) { $table->bigIncrements('id'); $table->string('name', 32)->comment('爬虫名称'); $table->unsignedBigInteger('command_id')->comment('命令ID')->index('idx_command_id'); $table->json('arguments')->comment('指令列表'); $table->json('options')->comment('参数列表'); $table->string('crontab_mode', 64)->comment('定时模式'); $table->enum('state', ['enabled', 'disabled'])->comment('状态'); $table->softDeletesTz(); $table->timestampsTz(); }); }
    /** * Reverse the migrations. */     public function down(): void { Schema::dropIfExists('spiders'); } }

执行命令 php bin/hyperf.php gen:migration create_spider_logs_table,创建爬虫任务数据库表的迁移文件,并修改内容:

      
        <?php
      
      
        
          
use Hyperf\Database\Schema\Schema; use Hyperf\Database\Schema\Blueprint; use Hyperf\Database\Migrations\Migration;
class CreateSpiderLogsTable extends Migration {     /** * Run the migrations. */     public function up(): void { Schema::create('spider_logs', function (Blueprint $table) { $table->bigIncrements('id'); $table->unsignedBigInteger('spider_id')->comment('爬虫ID')->index('idx_spider_id'); $table->enum('event', ['trigger', 'start', 'print', 'finish'])->comment('事件名称:trigger 任务被触发;start 任务启动;print 任务回显;finish 任务结束'); $table->longText('message')->comment('事件信息'); $table->softDeletesTz(); $table->timestampsTz(); }); }
    /** * Reverse the migrations. */     public function down(): void { Schema::dropIfExists('spider_logs'); } }

很显然,我们使用 spiders 表来记录创建的任务所使用的命令和对应的参数,使用 spider_logs 来记录任务执行的过程

执行命令 php bin/hyperf.php migrate 执行数据库迁移;

执行命令 php bin/hyperf.php gen:model spiders 和 php bin/hyperf.php gen:model spider_logs 分别创建两个新的数据库表的模型类

2、完善 Spider 运行方法

执行命令:composer require dragonmantank/cron-expression 安装 crontab 解析工具,下面修改 App\Model\Spider 类,完善 spider 的执行流程

      
        <?php
      
      
        
          
declare(strict_types=1);
namespace App\Model;
use App\Exception\CommandException; use Carbon\Carbon; use Cron\CronExpression; use Exception; use Hyperf\Context\ApplicationContext; use Hyperf\Redis\Redis; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; use RedisException; use Symfony\Component\Process\Process;
/** * @property int $id * @property string $name * @property int $commandId * @property string $arguments * @property string $options * @property string $crontabMode * @property string $state * @property Carbon $deletedAt * @property Carbon $createdAt * @property Carbon $updatedAt */ class Spider extends Model { /** * The table associated with the model. */ protected ?string $table = 'spiders';
/** * The attributes that are mass assignable. */ protected array $fillable = ['id', 'name', 'command_id', 'arguments', 'options', 'crontab_mode', 'state', 'deleted_at', 'created_at', 'updated_at'];
/** * The attributes that should be cast to native types. */ protected array $casts = ['id' => 'integer', 'command_id' => 'integer', 'created_at' => 'datetime', 'updated_at' => 'datetime', 'deleted_at' => 'datetime'];
/** * @return self[] * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws RedisException */     public static function getDue(): array { return array_filter(array_map( fn($item)=>($spider=unserialize($item)) instanceof Spider && $spider->cronExpression->isDue()?$spider:null , ApplicationContext::getContainer()->get(Redis::class)->hGetAll('crontab'))); }
/** * @return string|null */     public function exec(): ?string { SpiderLog::create([ 'spiderId' => $this->id, 'event' => 'trigger', 'message' => json_encode(['time' => date('Y-m-d H:i:s')], JSON_UNESCAPED_UNICODE), ]); /** @var Command $command */ $command = Command::query()->find($this->commandId); $argsData = json_decode($command->argument, true); $args = json_decode($this->arguments, true); $params = []; foreach ($argsData as $argsItem) $params[] = $args[$argsItem['name']]; $options = json_decode($this->options, true); SpiderLog::create([ 'spiderId' => $this->id, 'event' => 'start', 'message' => json_encode(['arguments' => $params, 'options' => $options], JSON_UNESCAPED_UNICODE), ]); $leave = ''; $command->exec($params, $options, function ($type, $buffer) use (&$leave) { SpiderLog::create([ 'spiderId' => $this->id, 'event' => 'print', 'message' => $buffer, ]); $arrayData = explode("\n", $leave . $buffer); $leave = array_pop($arrayData); foreach ($arrayData as $item) { var_dump($item); // 提交到 MongoDB 中 } }); try { $result = $command->wait(); } catch (CommandException $e) { $result = $e->getMessage(); } SpiderLog::create([ 'spiderId' => $this->id, 'event' => 'finish', 'message' => json_encode(['arguments' => $params, 'options' => $options], JSON_UNESCAPED_UNICODE), ]); return $result; }
/** * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws RedisException */     public static function cleanCache(): void { ApplicationContext::getContainer()->get(Redis::class)->del('crontab'); }
/** @var CronExpression|null */ protected ?CronExpression $cronExpression = null;
/** * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws RedisException */     public function cache(): void { $this->cronExpression = new CronExpression($this->crontabMode); if ($this->state === 'enabled') ApplicationContext::getContainer()->get(Redis::class) ->hSet('crontab', strval($this->id), serialize($this)); else ApplicationContext::getContainer()->get(Redis::class) ->hDel('crontab', strval($this->id)); } }

这个类的使用其实也挺简单的:

cleanCache 静态方法则是清除全部 Redis 缓存

cache 方法可以将当前数据同步到 Redis 缓存中

getDue 静态方法可以将 Redis 缓存中达到执行时间的 Spider 对象返回

exec 方法构建当前对象中的参数、选项和 Command 对象,然后调用 Command 对象的 exec 方法开始执行对应的命令,在不同的时机会保存对应的 SpiderLog 记录

3、设置系统启动时全量构建 Spider 记录的缓存

修改 config\autoload\server.php 文件,在启动添加 start 回调:

      
        <?php
      
      
        
          
declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://hyperf.wiki * @contact group@hyperf.io * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */
use App\Model\Spider; use Hyperf\Server\Event; use Hyperf\Server\Server; use Swoole\Constant;
return [ 'mode' => SWOOLE_PROCESS, 'servers' => [ [ 'name' => 'http', 'type' => Server::SERVER_WEBSOCKET, 'host' => '0.0.0.0', 'port' => 9501, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ Event::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'], Event::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'], Event::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'], Event::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'], ], ], ], 'settings' => [ Constant::OPTION_ENABLE_COROUTINE => true, Constant::OPTION_WORKER_NUM => swoole_cpu_num(), Constant::OPTION_PID_FILE => BASE_PATH . '/runtime/hyperf.pid', Constant::OPTION_OPEN_TCP_NODELAY => true, Constant::OPTION_MAX_COROUTINE => 100000, Constant::OPTION_OPEN_HTTP2_PROTOCOL => true, Constant::OPTION_MAX_REQUEST => 100000, Constant::OPTION_SOCKET_BUFFER_SIZE => 2 * 1024 * 1024, Constant::OPTION_BUFFER_OUTPUT_SIZE => 2 * 1024 * 1024,
Constant::OPTION_TASK_WORKER_NUM => swoole_cpu_num() * 4, Constant::OPTION_TASK_ENABLE_COROUTINE => false, ], 'callbacks' => [ Event::ON_WORKER_START => [Hyperf\Framework\Bootstrap\WorkerStartCallback::class, 'onWorkerStart'], Event::ON_PIPE_MESSAGE => [Hyperf\Framework\Bootstrap\PipeMessageCallback::class, 'onPipeMessage'], Event::ON_WORKER_EXIT => [Hyperf\Framework\Bootstrap\WorkerExitCallback::class, 'onWorkerExit'],
Event::ON_TASK => [Hyperf\Framework\Bootstrap\TaskCallback::class, 'onTask'], Event::ON_FINISH => [Hyperf\Framework\Bootstrap\FinishCallback::class, 'onFinish'], Event::ON_START => function(){ Spider::cleanCache(); $spiders = Spider::query()->where('state', 'enabled')->get(); foreach ($spiders as $spider) $spider->cache(); var_dump('cache success!!!'); } ], ];

如上例代码展示,添加的 ON_START 回调,在系统启动时,清除全部 redis 的缓存,然后将处于 enabled 状态的 Spider 记录进行缓存。

4、设置定时器,定时增量和全量更新 Spider 的缓存

执行命令:composer require hyperf\crontab 为系统添加定时任务执行的能力。

修改配置文件:config\autoload\processes.php 文件

      
        <?php
      
      
        
          
declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://hyperf.wiki * @contact group@hyperf.io * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */
return [ Hyperf\Crontab\Process\CrontabDispatcherProcess::class, ];

新增或修改配置文件:config\autoload\crontab.php 文件

      
        <?php
      
      
        
          
declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://hyperf.wiki * @contact group@hyperf.io * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */ return [     'enable' => true, ];

创建工具类:App\Utils\CrontabExec 

      
        <?php
      
      
        
          
namespace App\Utils;
use App\Model\Spider; use App\Process\CrontabDispatcherProcess; use Carbon\Carbon; use Hyperf\Crontab\Annotation\Crontab; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; use RedisException;
class CrontabExec {     /** * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws RedisException */ #[Crontab(rule: '* * * * *', name: 'crontab-cache', singleton: true, memo: '定时任务缓存更新命令')]     public function cacheSpider(): void { $spiders = Spider::query()->where('state', 'enabled') ->where('updated_at', '>', (new Carbon())->subSeconds(90)) ->get(); foreach ($spiders as $spider) $spider->cache(); }
/** * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws RedisException */     #[Crontab(rule: '0 0 * * *', name: 'crontab-cache-all', singleton: true, memo: '定时任务缓存全部命令')]     public function cacheAllSpider(): void { $spiders = Spider::query()->get(); foreach ($spiders as $spider) $spider->cache(); } }

以上代码也比较简单,为系统添加了 crontab 任务执行进程,并在进程中设置了两个定时任务,分别为每分钟执行一次的增量缓存更新和每天执行一次的全量缓存更新

5、设置定时器,定时执行 Spider 任务

添加类:App\Process\CrontabDispatcherProcess

      
        <?php
      
      
        
          
namespace App\Process;
use Hyperf\Context\ApplicationContext; use Hyperf\Contract\ConfigInterface; use Hyperf\Coroutine\Concurrent; use Hyperf\Coroutine\Coroutine; use Hyperf\Engine\Contract\ChannelInterface; use Hyperf\Process\ProcessManager; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; use Throwable;
class CrontabDispatcherProcess extends \Hyperf\Crontab\Process\CrontabDispatcherProcess { /** @var self|null */ private static ?self $_instance;
/** @var ?ChannelInterface */ private ?ChannelInterface $channel;
/** * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface */     public function handle(): void { self::$_instance = $this; $this->channel = ApplicationContext::getContainer()->get(ChannelInterface::class); Coroutine::create(function () { $concurrent = new Concurrent(intval( ApplicationContext::getContainer()->get(ConfigInterface::class)->get('crontab.pool') )); while (ProcessManager::isRunning()) { try { $concurrent->create($this->channel->pop()); } catch (Throwable) { continue; } } }); parent::handle(); }
/** * @param callable $callback * @return bool */     public static function pushExec(callable $callback): bool { return self::$_instance->channel->push($callback); } }

修改配置文件:config\autoload\processes.php 文件(这个文件在上一步操作已经调整过,再次调整,使用自定义的定时任务执行进程来替代原本的定时任务执行进程)

      
        <?php
      
      
        
          
declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://hyperf.wiki * @contact group@hyperf.io * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */
return [ App\Process\CrontabDispatcherProcess::class, ];

修改配置文件:config\autoload\crontab.php 文件(添加一个 pool 配置)

      
        <?php
      
      
        
          
declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://hyperf.wiki * @contact group@hyperf.io * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */ return [ 'enable' => true, 'pool' => 128, ];

修改配置文件:config\autoload\dependencies.php 文件

      
        <?php
      
      
        
          
declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://hyperf.wiki * @contact group@hyperf.io * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */
return [ Hyperf\Engine\Contract\ChannelInterface::class => Hyperf\Engine\Channel::class, Hyperf\Crontab\Strategy\StrategyInterface::class => Hyperf\Crontab\Strategy\CoroutineStrategy::class, ];

修改 App\Utils\CrontabExec 类,添加方法 execute 

      
        <?php
      
      
        
          
namespace App\Utils;
use App\Model\Spider; use App\Process\CrontabDispatcherProcess; use Carbon\Carbon; use Hyperf\Crontab\Annotation\Crontab; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; use RedisException;
class CrontabExec { /** * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws RedisException */ #[Crontab(rule: '* * * * *', name: 'crontab-exec', singleton: false, memo: '定时任务调起执行脚本')]     public function execute(): void { array_map( fn(Spider $spider)=>CrontabDispatcherProcess::pushExec(fn()=>$spider->exec()), Spider::getDue() ); }
/** * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws RedisException */ #[Crontab(rule: '* * * * *', name: 'crontab-cache', singleton: true, memo: '定时任务缓存更新命令')]     public function cacheSpider(): void { $spiders = Spider::query()->where('state', 'enabled') ->where('updated_at', '>', (new Carbon())->subSeconds(90)) ->get(); foreach ($spiders as $spider) $spider->cache(); }
/** * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws RedisException */ #[Crontab(rule: '0 0 * * *', name: 'crontab-cache-all', singleton: true, memo: '定时任务缓存全部命令')]     public function cacheAllSpider(): void { $spiders = Spider::query()->get(); foreach ($spiders as $spider) $spider->cache(); } }

上例代码中完成的基本流程是:

自定义一个 CrontabDispatcherProcess 进程类继承并替代系统定义的CrontabDispatcherProcess 进程类,通过 Channel 来推入和推出callback, 并将 callback 投放到协程池中执行

修改 CrontabExec 类,设置一个每分钟执行一次的定时器,将符合执行时间的 Spider 对象的 exec 方法作为回调丢入到 Channel 中

最后:功能验证

新建文件 bin\test.php 

      
        
          <?php
        
      
      
        (function ($url) {
      
      
        
          
function getDetailInformation(string $url): void { $curl = curl_init(); curl_setopt($curl, CURLOPT_URL, $url); curl_setopt($curl, CURLOPT_HEADER, false); curl_setopt($curl, CURLOPT_RETURNTRANSFER, true); curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, false); if (!($content = curl_exec($curl))) { throw new Exception(curl_error($curl), curl_errno($curl)); } $infos = explode('<div class="book-mid-info">', $content); foreach ($infos as $index => $info) { if ($index <= 0) continue; preg_match('/\\<a href="(\\/\\/www\\.qidian\\.com\\/book\\/\\d+\\/)" target="\\_blank" data\\-eid="qd\\_B58" data\\-bid="\d+" title="[\Wa-zA-Z0-9]+"\\>([\Wa-zA-Z0-9]+)\\<\\/a\\>/', $info, $matches); preg_match('/\\<a class="name" href="\\/\\/my\\.qidian\\.com\\/author\\/\\d+\\/" data\\-eid="qd\\_B59" target="\\_blank"\\>([\Wa-zA-Z0-9]+)\\<\\/a\\>/', $info, $author); echo json_encode(['url' => 'https:' . $matches[1], 'title' => $matches[2], 'author' => $author[1]], JSON_UNESCAPED_UNICODE), PHP_EOL; } curl_close($curl); }
getDetailInformation($url);
})('https://www.qidian.com/free/all/action1-/');

这是一个很简单的爬虫脚本,获取某点小说网站免费小说第一页的书目列表,并输出到标准输出中

修改 commands 表的 php 命令的参数和选项:

0ee7d7f2e379a6a7e45f416046a9c28e.webp

这里将不安全的 -r 选项删除,并添加 script 参数作为执行脚本文件

828b3280bc4b7ee51c2077de3801d94b.webp

这里设置爬虫任务调用 php 命令,并将 script 参数设置为 test.php 的目录,并将 -d 选项设置为 memory_limit=1G ,定时器模式为:* * * * *,既每分钟执行一次,启动项目之后,观察命令行输出:

2a2c47705ad201e3da48fbfa396e739d.webp

查看 spider_logs 记录信息:

02fb9e3f10e328fa05d8d272626b116a.webp

由此可得,我们的定时任务已经可以按照预期的行为开始工作了。


今天的内容就是这些了,后续本人考虑将功能代码或一些参考文档开放出来,希望有需要的朋友可以关注我,一起参考、学习和交流。

----------


正文结束,以下是恰饭内容,关注公众号,超想要美了优选,享受外卖、出行、加油、用车、酒店优惠,更有超多福利等你发现,感谢您的支持


浏览 1
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报