pin框架 200行PHP代码 实现无序的分片上传
最近在为自己自己开发的 pin 框架编写文档,想着写一个案例,于是拿出前阵子涉及到的上传文件的场景做了个案例,先分享出来。
编写HTML文件如下:
<html lang="en"><head><meta charset="UTF-8"><meta name="viewport"content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0"><meta http-equiv="X-UA-Compatible" content="ie=edge"><title>Document</title></head><body><script src="//127.0.0.1:9501/upload.js"></script><script>let uploader = new Uploader();uploader.onCancel = function () {console.log('上传取消');};uploader.onProgress = function (progress) {console.log('上传进度', progress);};uploader.onSuccess = function (res) {console.log('上传完成', res);};uploader.onStart = function (files) {console.log('开始上传,文件列表:', files);}uploader.onFail = function (err) {console.log('错误信息:', err);}</script><button onclick="uploader.start({multiple:true})">upload</button></body></html>
其中的 upload.js 文件内容如下:
;class Uploader {/** @var {HTMLInputElement} */#uploadDom
/** @var {Boolean} */#selected
/** @var {Number} */#checkTimes
/** @var {Boolean} */#startSelect
/** @var {number} */#chunkSize = 1024 * 516
/** @var {null|Function} */onCancel
/** @var {null|Function} */onStart
/** @var {null|Function} */onProgress
/** @var {null|Function} */onSuccess
/** @var {null|Function} */onFail
/** @var {{count: Number, index: Number, total: Number, loaded: Number[], file: File|null}} */progress
constructor({accept = null, multiple = false} = {}) {this.#initUploadDom(accept, multiple)this.#initDocumentDom()this.#selected = falsethis.#checkTimes = 0this.#startSelect = falsethis.onStart = nullthis.onProgress = nullthis.onSuccess = nullthis.onFail = nullthis.onCancel = nullthis.progress = {count: 0, index: 0, total: 0, loaded: [], file: null}}
start({accept = null, multiple = false} = {}) {this.#startSelect = truethis.#selected = falsethis.#checkTimes = 0this.#uploadDom.value = ''if (accept) this.#uploadDom.accept = acceptif (multiple) this.#uploadDom.multiple = multiplethis.#uploadDom.click()this.progress = {count: 0, index: 0, total: 0, loaded: [], file: null}}
async #startUpload() {try {this.#selected = truelet files = this.#uploadDom.filesthis.onStart && this.onStart.call(this, files)this.progress.count = files.lengthfor (let i = 0; i < files.length; i++) {this.progress.index = i + 1this.progress.file = files[i]this.progress.total = files[i].sizethis.progress.loaded = []await this.#doUpload(files[i])}} catch (err) {this.onFail && this.onFail.call(this, err)}}
/**** @param {File} file*/#doUpload(file) {let fileSize = file.sizelet blockCount = Math.ceil(fileSize / this.#chunkSize)let uploadHash = window.btoa(encodeURIComponent(Math.random() + (new Date()).toString()))let asyncNum = 6return new Promise((s, us) => {let finished = []for (let i = 0; i < asyncNum; i++) {new Promise(async (f, uf) => {let p = 0try {while (true) {let index = (p++) * asyncNum + iif (index >= blockCount)breakawait new Promise((d, ud) => this.#doUploadChunk(file, fileSize, uploadHash, blockCount, index, d, ud))}f()} catch (err) {uf(err)}}).then(()=>{finished.push(true)if (finished.length === asyncNum)s()}).catch(err=>us(err))}})}
/**** @param {File} file* @param {Number} fileSize* @param {string} uploadHash* @param {Number} blockCount* @param {Number} index* @param {Function} finalDone* @param {Function} ud*/#doUploadChunk(file, fileSize, uploadHash, blockCount, index, finalDone, ud) {if (blockCount <= index)return finalDone()let form = new FormData()let min = index * this.#chunkSizelet max = Math.min((index + 1) * this.#chunkSize, fileSize)form.append('file', this.#getSlice(file, min, max))let xhr = new XMLHttpRequest()let isFail = falseif (this.onProgress) {xhr.upload.onprogress = (e) => {if(!isFail && e.lengthComputable) {this.progress.loaded[index] = Math.floor(e.loaded * (max - min) / e.total)this.onProgress.call(this, {count: this.progress.count, index: this.progress.index, total: this.progress.total, file: this.progress.file,loaded: this.progress.loaded.reduce((prev,cur) => prev + cur),})}}}if (this.onSuccess) {xhr.onload = () => {if (xhr.status === 200) {let res = JSON.parse(xhr.responseText)if (res.data) {this.onSuccess.call(this, {count: this.progress.count, index: this.progress.index, total: this.progress.total, file: this.progress.file, result: res})}finalDone()} else {isFail = trueud({status: xhr.status, response: xhr.response})}}}xhr.onerror = () => {isFail = trueud({status: xhr.status, response: xhr.response})}xhr.open('POST', this.currentUrl.protocol + '//' + this.currentUrl.host + '/upload', true)xhr.setRequestHeader('File-Name', file.name)xhr.setRequestHeader('File-Hash', uploadHash)xhr.setRequestHeader('File-Count', blockCount)xhr.setRequestHeader('File-Index', index + 1)xhr.send(form)}
/*** @param {File} file* @param {Number} start* @param {Number} end* @returns {Blob}*/#getSlice(file, start, end) {if (File.prototype.slice) {return file.slice(start, end)} else if (File.prototype.mozSlice) {return file.mozSlice(start, end)} else if (File.prototype.webkitSlice) {return file.webkitSlice(start, end)}}
#documentFocus() {if (!this.#startSelect)returnthis.#checkTimes++if (this.#selected) {this.#startSelect = false} else if (this.#checkTimes >= 50) {this.#startSelect = falsethis.onCancel && this.onCancel.call(this)} elsesetTimeout(this.#documentFocus.bind(this), 10)}
#initUploadDom(accept = null, multiple = false) {this.#uploadDom = document.createElement('input')this.#uploadDom.type = 'file'if (accept) this.#uploadDom.accept = acceptif (multiple) this.#uploadDom.multiple = truethis.#uploadDom.addEventListener('change', this.#startUpload.bind(this))}
#initDocumentDom() {window.addEventListener('focus', this.#documentFocus.bind(this))}}
(function (d) {if (!Uploader.prototype.currentUrl) {let scripts= d.getElementsByTagName('script')Uploader.prototype.currentUrl = new URL(scripts[scripts.length - 1].src)}})(document)
原理也很简单,在单击按钮的时候,出发Uploader对象的start方法,生成一个input:file 并点击它选择文件,文件选好之后,对文件大小进行分片,然后按照6个请求的池子做并发上传(分片上传不分先后,可以是无序的)
PHP的代码如下:
use Hyperf\Coroutine\Coroutine;use Hyperf\HttpMessage\Upload\UploadedFile;use Nash\Pin\Server\{Request, Response};use Hyperf\HttpMessage\Exception\HttpException;
require __DIR__ . '/vendor/autoload.php';
(function() {
class BusinessException extends HttpException {}
// 全局的异常处理器App::addExceptionHandler(function (Throwable $throwable) {if ($throwable instanceof BusinessException)Response::instance()->json(['code' => $throwable->getCode(), 'data' => null, 'message' => $throwable->getMessage()]);if ($throwable instanceof HttpException)return Response::instance()->withStatus($throwable->getStatusCode());Response::instance()->withStatus(500);return $throwable;});
// 全局的中间件App::addMiddleware(function ($next) {Response::instance()->withHeader('Access-Control-Allow-Origin', '*');Response::instance()->withHeader('Access-Control-Allow-Headers', 'File-Hash, File-Name, File-Count, File-Index');if (Request::instance()->isMethod('OPTIONS'))return Response::instance()->withStatus(200);return $next();});
$js = file_get_contents(__DIR__ . '/upload.js');
// 获取上传脚本App::get('/upload.js', fn() => $js);
// 执行上传操作App::post('/upload', function (Request $request) {if ($fileName = (new Uploader($request))->setUploadDirectory(__DIR__ . '/upload')->doUpload())return ['code' => 0, 'data' => $fileName, 'message' => '文件上传完成'];return ['code' => 0, 'data' => null, 'message' => '分片上传完成'];});
class Uploader {private UploadedFile $file;private string $hash;private string $name;private int $index;private int $count;private string $uploadDirectory;private string $uploadTaskDirectory;private string $targetName;private bool $isFirst = false;
/*** @param Request $request*/public function __construct(Request $request) {if (!$request->hasFile('file'))throw new BusinessException(400, '没有上传的文件', 10001);$this->init($request);$this->setUploadDirectory('/tmp');}
/*** @param Request $request* @return void*/private function init(Request $request): void {[$this->file, $this->hash] = [$request->file('file'), $request->query('file-hash', $request->header('File-Hash'))];if (empty($this->hash)) {$this->name = $this->hash = md5(mt_rand() . time());$this->index = $this->count = 1;} else {$this->name = $request->query('file-name', $request->header('File-Name', $this->hash));$this->index = (int)$request->query('file-index', $request->header('File-Index', 1));$this->count = (int)$request->query('file-count', $request->header('File-Count', 1));}}
/*** @param string $uploadDirectory* @return Uploader*/public function setUploadDirectory(string $uploadDirectory): self {$this->uploadDirectory = rtrim($uploadDirectory, '/');return $this;}
/*** @return $this*/public function doUpload(): ?string {$this->targetName = $this->uploadDirectory . '/' . $this->name;if ($this->count === 1) {$this->singleUpload();} else {$this->multipleUpload();}return $this->waitMerge();}
/*** @return string|null*/private function waitMerge(): ?string {if ($this->count === 1)return $this->targetName;if ($this->isFirst) Coroutine::create(fn()=>$this->merge());if (count(scandir($this->uploadTaskDirectory)) === 2 + 2 * ($this->count)) {$fp = fopen($this->targetName . '.lock', 'w+');if (flock($fp, LOCK_EX | LOCK_NB)) {$fileFp = fopen($this->targetName, 'rb+');flock($fileFp, LOCK_EX);Coroutine::create(function () {exec('rm -rf "' . $this->uploadTaskDirectory . '"');unlink($this->targetName . '.lock');});return $this->targetName;}}return null;}
/*** @return void*/private function merge(): void {[$currentIndex, $startTime, $fp] = [1, time(), fopen($this->targetName, 'wb+')];flock($fp, LOCK_EX);while (true) {$currentTmpFile = "$this->uploadTaskDirectory/$currentIndex-$this->name.tmp";if (file_exists($currentTmpFile) && file_exists($currentTmpFile . '.md5') && $currentIndex++) {$tmpFp = fopen($currentTmpFile, 'rb');fwrite($fp, fread($tmpFp, filesize($currentTmpFile)));if ($currentIndex > $this->count)break;} elseCoroutine::sleep(0.001);if ($startTime + 3600 * 2 < time())break;}fclose($fp);if ($startTime + 3600 * 2 < time()) {@unlink($this->targetName);exec('rm -rf ' . $this->uploadTaskDirectory);}}
/*** @return void*/private function multipleUpload(): void {$this->uploadTaskDirectory = $this->uploadDirectory . '/' . md5($this->hash);is_dir($this->uploadTaskDirectory) || $this->isFirst = @mkdir($this->uploadTaskDirectory, 0755, true);$filename = $this->save($this->uploadTaskDirectory . '/' . $this->index . '-' . $this->name . '.tmp');file_put_contents($filename . '.md5', md5_file($filename));}
/*** @return void*/private function singleUpload(): void {$this->save($this->targetName);}
/*** @param string $fileName* @return string*/private function save(string $fileName): string {$this->file->moveTo($fileName);return $fileName;}}})();
原理也很简单,就是根据分片的信息来创建任务的目录,如果创建成功的话,就新建协程异步地开启文件锁合并分片,当最后一个分片完成后,释放锁,最后一个分片获得锁成功的话,就可以得知文件合并已经完成,响应给前端即可。从这个例子也可以看到,使用 pin 框架,只需要180行PHP代码、220行JS代码即可完成一个分片上传的功能。
评论
